Merge remote-tracking branch 'anasazi/io'

Conflicts:
	src/libstd/rt/test.rs
This commit is contained in:
Brian Anderson 2013-07-08 15:42:49 -07:00
commit b227583dad
12 changed files with 1672 additions and 687 deletions

View File

@ -8,7 +8,10 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
type Port = u16;
#[deriving(Eq, TotalEq)]
pub enum IpAddr {
Ipv4(u8, u8, u8, u8, u16),
Ipv6
Ipv4(u8, u8, u8, u8, Port),
Ipv6(u16, u16, u16, u16, u16, u16, u16, u16, Port)
}

View File

@ -18,15 +18,11 @@ use rt::rtio::{IoFactory, IoFactoryObject,
RtioTcpStream, RtioTcpStreamObject};
use rt::local::Local;
pub struct TcpStream {
rtstream: ~RtioTcpStreamObject
}
pub struct TcpStream(~RtioTcpStreamObject);
impl TcpStream {
fn new(s: ~RtioTcpStreamObject) -> TcpStream {
TcpStream {
rtstream: s
}
TcpStream(s)
}
pub fn connect(addr: IpAddr) -> Option<TcpStream> {
@ -38,13 +34,11 @@ impl TcpStream {
};
match stream {
Ok(s) => {
Some(TcpStream::new(s))
}
Ok(s) => Some(TcpStream::new(s)),
Err(ioerr) => {
rtdebug!("failed to connect: %?", ioerr);
io_error::cond.raise(ioerr);
return None;
None
}
}
}
@ -52,8 +46,7 @@ impl TcpStream {
impl Reader for TcpStream {
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
let bytes_read = self.rtstream.read(buf);
match bytes_read {
match (**self).read(buf) {
Ok(read) => Some(read),
Err(ioerr) => {
// EOF is indicated by returning None
@ -70,8 +63,7 @@ impl Reader for TcpStream {
impl Writer for TcpStream {
fn write(&mut self, buf: &[u8]) {
let res = self.rtstream.write(buf);
match res {
match (**self).write(buf) {
Ok(_) => (),
Err(ioerr) => {
io_error::cond.raise(ioerr);
@ -82,9 +74,7 @@ impl Writer for TcpStream {
fn flush(&mut self) { fail!() }
}
pub struct TcpListener {
rtlistener: ~RtioTcpListenerObject,
}
pub struct TcpListener(~RtioTcpListenerObject);
impl TcpListener {
pub fn bind(addr: IpAddr) -> Option<TcpListener> {
@ -93,11 +83,7 @@ impl TcpListener {
(*io).tcp_bind(addr)
};
match listener {
Ok(l) => {
Some(TcpListener {
rtlistener: l
})
}
Ok(l) => Some(TcpListener(l)),
Err(ioerr) => {
io_error::cond.raise(ioerr);
return None;
@ -108,8 +94,7 @@ impl TcpListener {
impl Listener<TcpStream> for TcpListener {
fn accept(&mut self) -> Option<TcpStream> {
let rtstream = self.rtlistener.accept();
match rtstream {
match (**self).accept() {
Ok(s) => {
Some(TcpStream::new(s))
}
@ -163,7 +148,7 @@ mod test {
}
#[test]
fn smoke_test() {
fn smoke_test_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();
@ -183,7 +168,27 @@ mod test {
}
#[test]
fn read_eof() {
fn smoke_test_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut buf = [0];
stream.read(buf);
assert!(buf[0] == 99);
}
do spawntask_immediately {
let mut stream = TcpStream::connect(addr);
stream.write([99]);
}
}
}
#[test]
fn read_eof_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();
@ -203,7 +208,27 @@ mod test {
}
#[test]
fn read_eof_twice() {
fn read_eof_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut buf = [0];
let nread = stream.read(buf);
assert!(nread.is_none());
}
do spawntask_immediately {
let _stream = TcpStream::connect(addr);
// Close
}
}
}
#[test]
fn read_eof_twice_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();
@ -225,7 +250,29 @@ mod test {
}
#[test]
fn write_close() {
fn read_eof_twice_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut buf = [0];
let nread = stream.read(buf);
assert!(nread.is_none());
let nread = stream.read(buf);
assert!(nread.is_none());
}
do spawntask_immediately {
let _stream = TcpStream::connect(addr);
// Close
}
}
}
#[test]
fn write_close_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();
@ -254,7 +301,36 @@ mod test {
}
#[test]
fn multiple_connect_serial() {
fn write_close_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let buf = [0];
loop {
let mut stop = false;
do io_error::cond.trap(|e| {
// NB: ECONNRESET on linux, EPIPE on mac
assert!(e.kind == ConnectionReset || e.kind == BrokenPipe);
stop = true;
}).in {
stream.write(buf);
}
if stop { break }
}
}
do spawntask_immediately {
let _stream = TcpStream::connect(addr);
// Close
}
}
}
#[test]
fn multiple_connect_serial_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();
let max = 10;
@ -279,7 +355,32 @@ mod test {
}
#[test]
fn multiple_connect_interleaved_greedy_schedule() {
fn multiple_connect_serial_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();
let max = 10;
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
for max.times {
let mut stream = listener.accept();
let mut buf = [0];
stream.read(buf);
assert_eq!(buf[0], 99);
}
}
do spawntask_immediately {
for max.times {
let mut stream = TcpStream::connect(addr);
stream.write([99]);
}
}
}
}
#[test]
fn multiple_connect_interleaved_greedy_schedule_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();
static MAX: int = 10;
@ -318,7 +419,46 @@ mod test {
}
#[test]
fn multiple_connect_interleaved_lazy_schedule() {
fn multiple_connect_interleaved_greedy_schedule_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();
static MAX: int = 10;
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
for int::range(0, MAX) |i| {
let stream = Cell::new(listener.accept());
rtdebug!("accepted");
// Start another task to handle the connection
do spawntask_immediately {
let mut stream = stream.take();
let mut buf = [0];
stream.read(buf);
assert!(buf[0] == i as u8);
rtdebug!("read");
}
}
}
connect(0, addr);
fn connect(i: int, addr: IpAddr) {
if i == MAX { return }
do spawntask_immediately {
rtdebug!("connecting");
let mut stream = TcpStream::connect(addr);
// Connect again before writing
connect(i + 1, addr);
rtdebug!("writing");
stream.write([i as u8]);
}
}
}
}
#[test]
fn multiple_connect_interleaved_lazy_schedule_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();
static MAX: int = 10;
@ -355,5 +495,43 @@ mod test {
}
}
}
#[test]
fn multiple_connect_interleaved_lazy_schedule_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();
static MAX: int = 10;
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
for int::range(0, MAX) |_| {
let stream = Cell::new(listener.accept());
rtdebug!("accepted");
// Start another task to handle the connection
do spawntask_later {
let mut stream = stream.take();
let mut buf = [0];
stream.read(buf);
assert!(buf[0] == 99);
rtdebug!("read");
}
}
}
connect(0, addr);
fn connect(i: int, addr: IpAddr) {
if i == MAX { return }
do spawntask_later {
rtdebug!("connecting");
let mut stream = TcpStream::connect(addr);
// Connect again before writing
connect(i + 1, addr);
rtdebug!("writing");
stream.write([99]);
}
}
}
}
}

View File

@ -8,38 +8,247 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::*;
use super::super::*;
use super::ip::IpAddr;
use option::{Option, Some, None};
use result::{Ok, Err};
use rt::io::net::ip::IpAddr;
use rt::io::{Reader, Writer};
use rt::io::{io_error, read_error, EndOfFile};
use rt::rtio::{RtioUdpSocketObject, RtioUdpSocket, IoFactory, IoFactoryObject};
use rt::local::Local;
pub struct UdpStream;
pub struct UdpSocket(~RtioUdpSocketObject);
impl UdpStream {
pub fn connect(_addr: IpAddr) -> Option<UdpStream> {
fail!()
impl UdpSocket {
pub fn bind(addr: IpAddr) -> Option<UdpSocket> {
let socket = unsafe { (*Local::unsafe_borrow::<IoFactoryObject>()).udp_bind(addr) };
match socket {
Ok(s) => Some(UdpSocket(s)),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
}
}
pub fn recvfrom(&self, buf: &mut [u8]) -> Option<(uint, IpAddr)> {
match (**self).recvfrom(buf) {
Ok((nread, src)) => Some((nread, src)),
Err(ioerr) => {
// EOF is indicated by returning None
if ioerr.kind != EndOfFile {
read_error::cond.raise(ioerr);
}
None
}
}
}
pub fn sendto(&self, buf: &[u8], dst: IpAddr) {
match (**self).sendto(buf, dst) {
Ok(_) => (),
Err(ioerr) => io_error::cond.raise(ioerr),
}
}
pub fn connect(self, other: IpAddr) -> UdpStream {
UdpStream { socket: self, connectedTo: other }
}
}
pub struct UdpStream {
socket: UdpSocket,
connectedTo: IpAddr
}
impl UdpStream {
pub fn as_socket<T>(&self, f: &fn(&UdpSocket) -> T) -> T { f(&self.socket) }
pub fn disconnect(self) -> UdpSocket { self.socket }
}
impl Reader for UdpStream {
fn read(&mut self, _buf: &mut [u8]) -> Option<uint> { fail!() }
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
do self.as_socket |sock| {
match sock.recvfrom(buf) {
Some((_nread, src)) if src != self.connectedTo => Some(0),
Some((nread, _src)) => Some(nread),
None => None,
}
}
}
fn eof(&mut self) -> bool { fail!() }
}
impl Writer for UdpStream {
fn write(&mut self, _buf: &[u8]) { fail!() }
fn write(&mut self, buf: &[u8]) {
do self.as_socket |sock| {
sock.sendto(buf, self.connectedTo);
}
}
fn flush(&mut self) { fail!() }
}
pub struct UdpListener;
#[cfg(test)]
mod test {
use super::*;
use rt::test::*;
use rt::io::net::ip::Ipv4;
use rt::io::*;
use option::{Some, None};
impl UdpListener {
pub fn bind(_addr: IpAddr) -> Option<UdpListener> {
fail!()
#[test] #[ignore]
fn bind_error() {
do run_in_newsched_task {
let mut called = false;
do io_error::cond.trap(|e| {
assert!(e.kind == PermissionDenied);
called = true;
}).in {
let addr = Ipv4(0, 0, 0, 0, 1);
let socket = UdpSocket::bind(addr);
assert!(socket.is_none());
}
assert!(called);
}
}
#[test]
fn socket_smoke_test_ip4() {
do run_in_newsched_task {
let server_ip = next_test_ip4();
let client_ip = next_test_ip4();
do spawntask_immediately {
match UdpSocket::bind(server_ip) {
Some(server) => {
let mut buf = [0];
match server.recvfrom(buf) {
Some((nread, src)) => {
assert_eq!(nread, 1);
assert_eq!(buf[0], 99);
assert_eq!(src, client_ip);
}
None => fail!()
}
}
None => fail!()
}
}
do spawntask_immediately {
match UdpSocket::bind(client_ip) {
Some(client) => client.sendto([99], server_ip),
None => fail!()
}
}
}
}
#[test]
fn socket_smoke_test_ip6() {
do run_in_newsched_task {
let server_ip = next_test_ip6();
let client_ip = next_test_ip6();
do spawntask_immediately {
match UdpSocket::bind(server_ip) {
Some(server) => {
let mut buf = [0];
match server.recvfrom(buf) {
Some((nread, src)) => {
assert_eq!(nread, 1);
assert_eq!(buf[0], 99);
assert_eq!(src, client_ip);
}
None => fail!()
}
}
None => fail!()
}
}
do spawntask_immediately {
match UdpSocket::bind(client_ip) {
Some(client) => client.sendto([99], server_ip),
None => fail!()
}
}
}
}
#[test]
fn stream_smoke_test_ip4() {
do run_in_newsched_task {
let server_ip = next_test_ip4();
let client_ip = next_test_ip4();
do spawntask_immediately {
match UdpSocket::bind(server_ip) {
Some(server) => {
let server = ~server;
let mut stream = server.connect(client_ip);
let mut buf = [0];
match stream.read(buf) {
Some(nread) => {
assert_eq!(nread, 1);
assert_eq!(buf[0], 99);
}
None => fail!()
}
}
None => fail!()
}
}
do spawntask_immediately {
match UdpSocket::bind(client_ip) {
Some(client) => {
let client = ~client;
let mut stream = client.connect(server_ip);
stream.write([99]);
}
None => fail!()
}
}
}
}
#[test]
fn stream_smoke_test_ip6() {
do run_in_newsched_task {
let server_ip = next_test_ip6();
let client_ip = next_test_ip6();
do spawntask_immediately {
match UdpSocket::bind(server_ip) {
Some(server) => {
let server = ~server;
let mut stream = server.connect(client_ip);
let mut buf = [0];
match stream.read(buf) {
Some(nread) => {
assert_eq!(nread, 1);
assert_eq!(buf[0], 99);
}
None => fail!()
}
}
None => fail!()
}
}
do spawntask_immediately {
match UdpSocket::bind(client_ip) {
Some(client) => {
let client = ~client;
let mut stream = client.connect(server_ip);
stream.write([99]);
}
None => fail!()
}
}
}
}
}
impl Listener<UdpStream> for UdpListener {
fn accept(&mut self) -> Option<UdpStream> { fail!() }
}

View File

@ -22,6 +22,7 @@ pub type RemoteCallbackObject = uvio::UvRemoteCallback;
pub type IoFactoryObject = uvio::UvIoFactory;
pub type RtioTcpStreamObject = uvio::UvTcpStream;
pub type RtioTcpListenerObject = uvio::UvTcpListener;
pub type RtioUdpSocketObject = uvio::UvUdpSocket;
pub trait EventLoop {
fn run(&mut self);
@ -44,13 +45,42 @@ pub trait RemoteCallback {
pub trait IoFactory {
fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>;
fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>;
fn udp_bind(&mut self, addr: IpAddr) -> Result<~RtioUdpSocketObject, IoError>;
}
pub trait RtioTcpListener {
pub trait RtioTcpListener : RtioSocket {
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>;
fn accept_simultaneously(&self);
fn dont_accept_simultaneously(&self);
}
pub trait RtioTcpStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
pub trait RtioTcpStream : RtioSocket {
fn read(&self, buf: &mut [u8]) -> Result<uint, IoError>;
fn write(&self, buf: &[u8]) -> Result<(), IoError>;
fn peer_name(&self) -> IpAddr;
fn control_congestion(&self);
fn nodelay(&self);
fn keepalive(&self, delay_in_seconds: uint);
fn letdie(&self);
}
pub trait RtioSocket {
fn socket_name(&self) -> IpAddr;
}
pub trait RtioUdpSocket : RtioSocket {
fn recvfrom(&self, buf: &mut [u8]) -> Result<(uint, IpAddr), IoError>;
fn sendto(&self, buf: &[u8], dst: IpAddr) -> Result<(), IoError>;
fn join_multicast(&self, multi: IpAddr);
fn leave_multicast(&self, multi: IpAddr);
fn loop_multicast_locally(&self);
fn dont_loop_multicast_locally(&self);
fn multicast_time_to_live(&self, ttl: int);
fn time_to_live(&self, ttl: int);
fn hear_broadcasts(&self);
fn ignore_broadcasts(&self);
}

View File

@ -8,12 +8,16 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use cell::Cell;
use libc;
use uint;
use option::{Some, None};
use cell::Cell;
use clone::Clone;
use container::Container;
use iterator::IteratorUtil;
use vec::{OwnedVector, MutableVector};
use super::io::net::ip::{IpAddr, Ipv4, Ipv6};
use rt::sched::Scheduler;
use super::io::net::ip::{IpAddr, Ipv4};
use rt::local::Local;
use unstable::run_in_bare_thread;
use rt::thread::Thread;
@ -324,11 +328,16 @@ pub fn next_test_port() -> u16 {
}
}
/// Get a unique localhost:port pair starting at 9600
/// Get a unique IPv4 localhost:port pair starting at 9600
pub fn next_test_ip4() -> IpAddr {
Ipv4(127, 0, 0, 1, next_test_port())
}
/// Get a unique IPv6 localhost:port pair starting at 9600
pub fn next_test_ip6() -> IpAddr {
Ipv6(0, 0, 0, 0, 0, 0, 0, 1, next_test_port())
}
/*
XXX: Welcome to MegaHack City.

View File

@ -46,13 +46,14 @@ use libc::{c_void, c_int, size_t, malloc, free};
use cast::transmute;
use ptr::null;
use unstable::finally::Finally;
use rt::io::net::ip::IpAddr;
use rt::io::IoError;
#[cfg(test)] use unstable::run_in_bare_thread;
pub use self::file::FsRequest;
pub use self::net::{StreamWatcher, TcpWatcher};
pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher};
pub use self::idle::IdleWatcher;
pub use self::timer::TimerWatcher;
pub use self::async::AsyncWatcher;
@ -126,6 +127,8 @@ pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
pub type FsCallback = ~fn(FsRequest, Option<UvError>);
pub type TimerCallback = ~fn(TimerWatcher, Option<UvError>);
pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>);
pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, IpAddr, uint, Option<UvError>);
pub type UdpSendCallback = ~fn(UdpWatcher, Option<UvError>);
/// Callbacks used by StreamWatchers, set as custom data on the foreign handle
@ -137,7 +140,9 @@ struct WatcherData {
alloc_cb: Option<AllocCallback>,
idle_cb: Option<IdleCallback>,
timer_cb: Option<TimerCallback>,
async_cb: Option<AsyncCallback>
async_cb: Option<AsyncCallback>,
udp_recv_cb: Option<UdpReceiveCallback>,
udp_send_cb: Option<UdpSendCallback>
}
pub trait WatcherInterop {
@ -167,7 +172,9 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
alloc_cb: None,
idle_cb: None,
timer_cb: None,
async_cb: None
async_cb: None,
udp_recv_cb: None,
udp_send_cb: None
};
let data = transmute::<~WatcherData, *c_void>(data);
uvll::set_data_for_uv_handle(self.native_handle(), data);

View File

@ -9,35 +9,151 @@
// except according to those terms.
use prelude::*;
use libc::{size_t, ssize_t, c_int, c_void};
use libc::{size_t, ssize_t, c_int, c_void, c_uint};
use rt::uv::uvll;
use rt::uv::uvll::*;
use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback};
use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback, UdpSendCallback};
use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback,
status_to_maybe_uv_error};
use rt::io::net::ip::{IpAddr, Ipv4, Ipv6};
use rt::uv::last_uv_error;
use vec;
use str;
use from_str::{FromStr};
use num;
fn ip4_as_uv_ip4<T>(addr: IpAddr, f: &fn(*sockaddr_in) -> T) -> T {
match addr {
Ipv4(a, b, c, d, p) => {
unsafe {
let addr = malloc_ip4_addr(fmt!("%u.%u.%u.%u",
a as uint,
b as uint,
c as uint,
d as uint), p as int);
do (|| {
f(addr)
}).finally {
free_ip4_addr(addr);
}
}
enum UvIpAddr {
UvIpv4(*sockaddr_in),
UvIpv6(*sockaddr_in6),
}
fn sockaddr_to_UvIpAddr(addr: *uvll::sockaddr) -> UvIpAddr {
unsafe {
assert!((is_ip4_addr(addr) || is_ip6_addr(addr)));
assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr)));
match addr {
_ if is_ip4_addr(addr) => UvIpv4(as_sockaddr_in(addr)),
_ if is_ip6_addr(addr) => UvIpv6(as_sockaddr_in6(addr)),
_ => fail!(),
}
Ipv6 => fail!()
}
}
fn ip_as_uv_ip<T>(addr: IpAddr, f: &fn(UvIpAddr) -> T) -> T {
let malloc = match addr {
Ipv4(*) => malloc_ip4_addr,
Ipv6(*) => malloc_ip6_addr,
};
let wrap = match addr {
Ipv4(*) => UvIpv4,
Ipv6(*) => UvIpv6,
};
let ip_str = match addr {
Ipv4(x1, x2, x3, x4, _) =>
fmt!("%u.%u.%u.%u", x1 as uint, x2 as uint, x3 as uint, x4 as uint),
Ipv6(x1, x2, x3, x4, x5, x6, x7, x8, _) =>
fmt!("%x:%x:%x:%x:%x:%x:%x:%x",
x1 as uint, x2 as uint, x3 as uint, x4 as uint,
x5 as uint, x6 as uint, x7 as uint, x8 as uint),
};
let port = match addr {
Ipv4(_, _, _, _, p) | Ipv6(_, _, _, _, _, _, _, _, p) => p as int
};
let free = match addr {
Ipv4(*) => free_ip4_addr,
Ipv6(*) => free_ip6_addr,
};
let addr = unsafe { malloc(ip_str, port) };
do (|| {
f(wrap(addr))
}).finally {
unsafe { free(addr) };
}
}
fn uv_ip_as_ip<T>(addr: UvIpAddr, f: &fn(IpAddr) -> T) -> T {
let ip_size = match addr {
UvIpv4(*) => 4/*groups of*/ * 3/*digits separated by*/ + 3/*periods*/,
UvIpv6(*) => 8/*groups of*/ * 4/*hex digits separated by*/ + 7 /*colons*/,
};
let ip_name = {
let buf = vec::from_elem(ip_size + 1 /*null terminated*/, 0u8);
unsafe {
match addr {
UvIpv4(addr) => uvll::ip4_name(addr, vec::raw::to_ptr(buf), ip_size as u64),
UvIpv6(addr) => uvll::ip6_name(addr, vec::raw::to_ptr(buf), ip_size as u64),
}
};
buf
};
let ip_port = unsafe {
let port = match addr {
UvIpv4(addr) => uvll::ip4_port(addr),
UvIpv6(addr) => uvll::ip6_port(addr),
};
port as u16
};
let ip_str = str::from_bytes_slice(ip_name).trim_right_chars(&'\x00');
let ip = match addr {
UvIpv4(*) => {
let ip: ~[u8] =
ip_str.split_iter('.')
.transform(|s: &str| -> u8 { FromStr::from_str(s).unwrap() })
.collect();
assert_eq!(ip.len(), 4);
Ipv4(ip[0], ip[1], ip[2], ip[3], ip_port)
},
UvIpv6(*) => {
let ip: ~[u16] = {
let expand_shorthand_and_convert = |s: &str| -> ~[~[u16]] {
let convert_each_segment = |s: &str| -> ~[u16] {
let read_hex_segment = |s: &str| -> u16 {
num::FromStrRadix::from_str_radix(s, 16u).unwrap()
};
match s {
"" => ~[],
s => s.split_iter(':').transform(read_hex_segment).collect(),
}
};
s.split_str_iter("::").transform(convert_each_segment).collect()
};
match expand_shorthand_and_convert(ip_str) {
[x] => x, // no shorthand found
[l, r] => l + vec::from_elem(8 - l.len() - r.len(), 0u16) + r, // fill the gap
_ => fail!(), // impossible. only one shorthand allowed.
}
};
assert_eq!(ip.len(), 8);
Ipv6(ip[0], ip[1], ip[2], ip[3], ip[4], ip[5], ip[6], ip[7], ip_port)
},
};
// finally run the closure
f(ip)
}
fn uv_ip_to_ip(addr: UvIpAddr) -> IpAddr {
use util;
uv_ip_as_ip(addr, util::id)
}
#[cfg(test)]
#[test]
fn test_ip4_conversion() {
use rt;
let ip4 = rt::test::next_test_ip4();
assert_eq!(ip4, ip_as_uv_ip(ip4, uv_ip_to_ip));
}
#[cfg(test)]
#[test]
fn test_ip6_conversion() {
use rt;
let ip6 = rt::test::next_test_ip6();
assert_eq!(ip6, ip_as_uv_ip(ip6, uv_ip_to_ip));
}
// uv_stream t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
// and uv_file_t
pub struct StreamWatcher(*uvll::uv_stream_t);
@ -51,13 +167,11 @@ impl StreamWatcher {
data.read_cb = Some(cb);
}
let handle = self.native_handle();
unsafe { uvll::read_start(handle, alloc_cb, read_cb); }
unsafe { uvll::read_start(self.native_handle(), alloc_cb, read_cb); }
extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
let data = stream_watcher.get_watcher_data();
let alloc_cb = data.alloc_cb.get_ref();
let alloc_cb = stream_watcher.get_watcher_data().alloc_cb.get_ref();
return (*alloc_cb)(suggested_size as uint);
}
@ -65,8 +179,7 @@ impl StreamWatcher {
rtdebug!("buf addr: %x", buf.base as uint);
rtdebug!("buf len: %d", buf.len as int);
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
let data = stream_watcher.get_watcher_data();
let cb = data.read_cb.get_ref();
let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
let status = status_to_maybe_uv_error(stream, nread as c_int);
(*cb)(stream_watcher, nread as int, buf, status);
}
@ -88,22 +201,15 @@ impl StreamWatcher {
}
let req = WriteRequest::new();
let bufs = [buf];
unsafe {
assert!(0 == uvll::write(req.native_handle(),
self.native_handle(),
bufs, write_cb));
assert_eq!(0, uvll::write(req.native_handle(), self.native_handle(), [buf], write_cb));
}
extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
let write_request: WriteRequest = NativeHandle::from_native_handle(req);
let mut stream_watcher = write_request.stream();
write_request.delete();
let cb = {
let data = stream_watcher.get_watcher_data();
let cb = data.write_cb.swap_unwrap();
cb
};
let cb = stream_watcher.get_watcher_data().write_cb.swap_unwrap();
let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
cb(stream_watcher, status);
}
@ -112,9 +218,7 @@ impl StreamWatcher {
pub fn accept(&mut self, stream: StreamWatcher) {
let self_handle = self.native_handle() as *c_void;
let stream_handle = stream.native_handle() as *c_void;
unsafe {
assert_eq!(0, uvll::accept(self_handle, stream_handle));
}
assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } );
}
pub fn close(self, cb: NullCallback) {
@ -129,10 +233,7 @@ impl StreamWatcher {
extern fn close_cb(handle: *uvll::uv_stream_t) {
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
{
let data = stream_watcher.get_watcher_data();
data.close_cb.swap_unwrap()();
}
stream_watcher.get_watcher_data().close_cb.swap_unwrap()();
stream_watcher.drop_watcher_data();
unsafe { free_handle(handle as *c_void) }
}
@ -140,8 +241,7 @@ impl StreamWatcher {
}
impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
fn from_native_handle(
handle: *uvll::uv_stream_t) -> StreamWatcher {
fn from_native_handle(handle: *uvll::uv_stream_t) -> StreamWatcher {
StreamWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_stream_t {
@ -153,7 +253,7 @@ pub struct TcpWatcher(*uvll::uv_tcp_t);
impl Watcher for TcpWatcher { }
impl TcpWatcher {
pub fn new(loop_: &mut Loop) -> TcpWatcher {
pub fn new(loop_: &Loop) -> TcpWatcher {
unsafe {
let handle = malloc_handle(UV_TCP);
assert!(handle.is_not_null());
@ -165,20 +265,17 @@ impl TcpWatcher {
}
pub fn bind(&mut self, address: IpAddr) -> Result<(), UvError> {
match address {
Ipv4(*) => {
do ip4_as_uv_ip4(address) |addr| {
let result = unsafe {
uvll::tcp_bind(self.native_handle(), addr)
};
if result == 0 {
Ok(())
} else {
Err(last_uv_error(self))
}
do ip_as_uv_ip(address) |addr| {
let result = unsafe {
match addr {
UvIpv4(addr) => uvll::tcp_bind(self.native_handle(), addr),
UvIpv6(addr) => uvll::tcp_bind6(self.native_handle(), addr),
}
};
match result {
0 => Ok(()),
_ => Err(last_uv_error(self)),
}
_ => fail!()
}
}
@ -188,16 +285,13 @@ impl TcpWatcher {
self.get_watcher_data().connect_cb = Some(cb);
let connect_handle = ConnectRequest::new().native_handle();
match address {
Ipv4(*) => {
do ip4_as_uv_ip4(address) |addr| {
rtdebug!("connect_t: %x", connect_handle as uint);
assert!(0 == uvll::tcp_connect(connect_handle,
self.native_handle(),
addr, connect_cb));
}
}
_ => fail!()
rtdebug!("connect_t: %x", connect_handle as uint);
do ip_as_uv_ip(address) |addr| {
let result = match addr {
UvIpv4(addr) => uvll::tcp_connect(connect_handle, self.native_handle(), addr, connect_cb),
UvIpv6(addr) => uvll::tcp_connect6(connect_handle, self.native_handle(), addr, connect_cb),
};
assert_eq!(0, result);
}
extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
@ -205,10 +299,7 @@ impl TcpWatcher {
let connect_request: ConnectRequest = NativeHandle::from_native_handle(req);
let mut stream_watcher = connect_request.stream();
connect_request.delete();
let cb: ConnectionCallback = {
let data = stream_watcher.get_watcher_data();
data.connect_cb.swap_unwrap()
};
let cb = stream_watcher.get_watcher_data().connect_cb.swap_unwrap();
let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
cb(stream_watcher, status);
}
@ -225,15 +316,13 @@ impl TcpWatcher {
unsafe {
static BACKLOG: c_int = 128; // XXX should be configurable
// XXX: This can probably fail
assert!(0 == uvll::listen(self.native_handle(),
BACKLOG, connection_cb));
assert_eq!(0, uvll::listen(self.native_handle(), BACKLOG, connection_cb));
}
extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
rtdebug!("connection_cb");
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
let data = stream_watcher.get_watcher_data();
let cb = data.connect_cb.get_ref();
let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
let status = status_to_maybe_uv_error(handle, status);
(*cb)(stream_watcher, status);
}
@ -253,6 +342,125 @@ impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
}
}
pub struct UdpWatcher(*uvll::uv_udp_t);
impl Watcher for UdpWatcher { }
impl UdpWatcher {
pub fn new(loop_: &Loop) -> UdpWatcher {
unsafe {
let handle = malloc_handle(UV_UDP);
assert!(handle.is_not_null());
assert_eq!(0, uvll::udp_init(loop_.native_handle(), handle));
let mut watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
watcher.install_watcher_data();
return watcher;
}
}
pub fn bind(&self, address: IpAddr) -> Result<(), UvError> {
do ip_as_uv_ip(address) |addr| {
let result = unsafe {
match addr {
UvIpv4(addr) => uvll::udp_bind(self.native_handle(), addr, 0u32),
UvIpv6(addr) => uvll::udp_bind6(self.native_handle(), addr, 0u32),
}
};
match result {
0 => Ok(()),
_ => Err(last_uv_error(self)),
}
}
}
pub fn recv_start(&self, alloc: AllocCallback, cb: UdpReceiveCallback) {
{
let mut this = *self;
let data = this.get_watcher_data();
data.alloc_cb = Some(alloc);
data.udp_recv_cb = Some(cb);
}
unsafe { uvll::udp_recv_start(self.native_handle(), alloc_cb, recv_cb); }
extern fn alloc_cb(handle: *uvll::uv_udp_t, suggested_size: size_t) -> Buf {
let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
let alloc_cb = udp_watcher.get_watcher_data().alloc_cb.get_ref();
return (*alloc_cb)(suggested_size as uint);
}
extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: Buf,
addr: *uvll::sockaddr, flags: c_uint) {
rtdebug!("buf addr: %x", buf.base as uint);
rtdebug!("buf len: %d", buf.len as int);
let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
let status = status_to_maybe_uv_error(handle, nread as c_int);
let addr = uv_ip_to_ip(sockaddr_to_UvIpAddr(addr));
(*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status);
}
}
pub fn recv_stop(&self) {
unsafe { uvll::udp_recv_stop(self.native_handle()); }
}
pub fn send(&self, buf: Buf, address: IpAddr, cb: UdpSendCallback) {
{
let mut this = *self;
let data = this.get_watcher_data();
assert!(data.udp_send_cb.is_none());
data.udp_send_cb = Some(cb);
}
let req = UdpSendRequest::new();
do ip_as_uv_ip(address) |addr| {
let result = unsafe {
match addr {
UvIpv4(addr) => uvll::udp_send(req.native_handle(), self.native_handle(), [buf], addr, send_cb),
UvIpv6(addr) => uvll::udp_send6(req.native_handle(), self.native_handle(), [buf], addr, send_cb),
}
};
assert_eq!(0, result);
}
extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
let send_request: UdpSendRequest = NativeHandle::from_native_handle(req);
let mut udp_watcher = send_request.handle();
send_request.delete();
let cb = udp_watcher.get_watcher_data().udp_send_cb.swap_unwrap();
let status = status_to_maybe_uv_error(udp_watcher.native_handle(), status);
cb(udp_watcher, status);
}
}
pub fn close(self, cb: NullCallback) {
{
let mut this = self;
let data = this.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe { uvll::close(self.native_handle(), close_cb); }
extern fn close_cb(handle: *uvll::uv_udp_t) {
let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
udp_watcher.get_watcher_data().close_cb.swap_unwrap()();
udp_watcher.drop_watcher_data();
unsafe { free_handle(handle as *c_void) }
}
}
}
impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {
fn from_native_handle(handle: *uvll::uv_udp_t) -> UdpWatcher {
UdpWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_udp_t {
match self { &UdpWatcher(ptr) => ptr }
}
}
// uv_connect_t is a subclass of uv_req_t
struct ConnectRequest(*uvll::uv_connect_t);
impl Request for ConnectRequest { }
@ -260,12 +468,9 @@ impl Request for ConnectRequest { }
impl ConnectRequest {
fn new() -> ConnectRequest {
let connect_handle = unsafe {
malloc_req(UV_CONNECT)
};
let connect_handle = unsafe { malloc_req(UV_CONNECT) };
assert!(connect_handle.is_not_null());
let connect_handle = connect_handle as *uvll::uv_connect_t;
ConnectRequest(connect_handle)
ConnectRequest(connect_handle as *uvll::uv_connect_t)
}
fn stream(&self) -> StreamWatcher {
@ -281,8 +486,7 @@ impl ConnectRequest {
}
impl NativeHandle<*uvll::uv_connect_t> for ConnectRequest {
fn from_native_handle(
handle: *uvll:: uv_connect_t) -> ConnectRequest {
fn from_native_handle(handle: *uvll:: uv_connect_t) -> ConnectRequest {
ConnectRequest(handle)
}
fn native_handle(&self) -> *uvll::uv_connect_t {
@ -296,12 +500,9 @@ impl Request for WriteRequest { }
impl WriteRequest {
pub fn new() -> WriteRequest {
let write_handle = unsafe {
malloc_req(UV_WRITE)
};
let write_handle = unsafe { malloc_req(UV_WRITE) };
assert!(write_handle.is_not_null());
let write_handle = write_handle as *uvll::uv_write_t;
WriteRequest(write_handle)
WriteRequest(write_handle as *uvll::uv_write_t)
}
pub fn stream(&self) -> StreamWatcher {
@ -325,6 +526,36 @@ impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
}
}
pub struct UdpSendRequest(*uvll::uv_udp_send_t);
impl Request for UdpSendRequest { }
impl UdpSendRequest {
pub fn new() -> UdpSendRequest {
let send_handle = unsafe { malloc_req(UV_UDP_SEND) };
assert!(send_handle.is_not_null());
UdpSendRequest(send_handle as *uvll::uv_udp_send_t)
}
pub fn handle(&self) -> UdpWatcher {
let send_request_handle = unsafe {
uvll::get_udp_handle_from_send_req(self.native_handle())
};
NativeHandle::from_native_handle(send_request_handle)
}
pub fn delete(self) {
unsafe { free_req(self.native_handle() as *c_void) }
}
}
impl NativeHandle<*uvll::uv_udp_send_t> for UdpSendRequest {
fn from_native_handle(handle: *uvll::uv_udp_send_t) -> UdpSendRequest {
UdpSendRequest(handle)
}
fn native_handle(&self) -> *uvll::uv_udp_send_t {
match self { &UdpSendRequest(ptr) => ptr }
}
}
#[cfg(test)]
mod test {
@ -339,7 +570,7 @@ mod test {
use rt::uv::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
#[test]
fn connect_close() {
fn connect_close_ip4() {
do run_in_bare_thread() {
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
@ -357,7 +588,51 @@ mod test {
}
#[test]
fn listen() {
fn connect_close_ip6() {
do run_in_bare_thread() {
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
// Connect to a port where nobody is listening
let addr = next_test_ip6();
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("tcp_watcher.connect!");
assert!(status.is_some());
assert_eq!(status.get().name(), ~"ECONNREFUSED");
stream_watcher.close(||());
}
loop_.run();
loop_.close();
}
}
#[test]
fn udp_bind_close_ip4() {
do run_in_bare_thread() {
let mut loop_ = Loop::new();
let udp_watcher = { UdpWatcher::new(&mut loop_) };
let addr = next_test_ip4();
udp_watcher.bind(addr);
udp_watcher.close(||());
loop_.run();
loop_.close();
}
}
#[test]
fn udp_bind_close_ip6() {
do run_in_bare_thread() {
let mut loop_ = Loop::new();
let udp_watcher = { UdpWatcher::new(&mut loop_) };
let addr = next_test_ip6();
udp_watcher.bind(addr);
udp_watcher.close(||());
loop_.run();
loop_.close();
}
}
#[test]
fn listen_ip4() {
do run_in_bare_thread() {
static MAX: int = 10;
let mut loop_ = Loop::new();
@ -366,10 +641,82 @@ mod test {
server_tcp_watcher.bind(addr);
let loop_ = loop_;
rtdebug!("listening");
do server_tcp_watcher.listen |server_stream_watcher, status| {
do server_tcp_watcher.listen |mut server_stream_watcher, status| {
rtdebug!("listened!");
assert!(status.is_none());
let mut loop_ = loop_;
let client_tcp_watcher = TcpWatcher::new(&mut loop_);
let mut client_tcp_watcher = client_tcp_watcher.as_stream();
server_stream_watcher.accept(client_tcp_watcher);
let count_cell = Cell::new(0);
let server_stream_watcher = server_stream_watcher;
rtdebug!("starting read");
let alloc: AllocCallback = |size| {
vec_to_uv_buf(vec::from_elem(size, 0))
};
do client_tcp_watcher.read_start(alloc) |stream_watcher, nread, buf, status| {
rtdebug!("i'm reading!");
let buf = vec_from_uv_buf(buf);
let mut count = count_cell.take();
if status.is_none() {
rtdebug!("got %d bytes", nread);
let buf = buf.unwrap();
for buf.slice(0, nread as uint).iter().advance() |byte| {
assert!(*byte == count as u8);
rtdebug!("%u", *byte as uint);
count += 1;
}
} else {
assert_eq!(count, MAX);
do stream_watcher.close {
server_stream_watcher.close(||());
}
}
count_cell.put_back(count);
}
}
let _client_thread = do Thread::start {
rtdebug!("starting client thread");
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
do tcp_watcher.connect(addr) |mut stream_watcher, status| {
rtdebug!("connecting");
assert!(status.is_none());
let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
let buf = slice_to_uv_buf(msg);
let msg_cell = Cell::new(msg);
do stream_watcher.write(buf) |stream_watcher, status| {
rtdebug!("writing");
assert!(status.is_none());
let msg_cell = Cell::new(msg_cell.take());
stream_watcher.close(||ignore(msg_cell.take()));
}
}
loop_.run();
loop_.close();
};
let mut loop_ = loop_;
loop_.run();
loop_.close();
}
}
#[test]
fn listen_ip6() {
do run_in_bare_thread() {
static MAX: int = 10;
let mut loop_ = Loop::new();
let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
let addr = next_test_ip6();
server_tcp_watcher.bind(addr);
let loop_ = loop_;
rtdebug!("listening");
do server_tcp_watcher.listen |mut server_stream_watcher, status| {
rtdebug!("listened!");
assert!(status.is_none());
let mut server_stream_watcher = server_stream_watcher;
let mut loop_ = loop_;
let client_tcp_watcher = TcpWatcher::new(&mut loop_);
let mut client_tcp_watcher = client_tcp_watcher.as_stream();
@ -409,10 +756,9 @@ mod test {
rtdebug!("starting client thread");
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
do tcp_watcher.connect(addr) |stream_watcher, status| {
do tcp_watcher.connect(addr) |mut stream_watcher, status| {
rtdebug!("connecting");
assert!(status.is_none());
let mut stream_watcher = stream_watcher;
let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
let buf = slice_to_uv_buf(msg);
let msg_cell = Cell::new(msg);
@ -432,4 +778,122 @@ mod test {
loop_.close();
}
}
#[test]
fn udp_recv_ip4() {
do run_in_bare_thread() {
static MAX: int = 10;
let mut loop_ = Loop::new();
let server_addr = next_test_ip4();
let client_addr = next_test_ip4();
let server = UdpWatcher::new(&loop_);
assert!(server.bind(server_addr).is_ok());
rtdebug!("starting read");
let alloc: AllocCallback = |size| {
vec_to_uv_buf(vec::from_elem(size, 0))
};
do server.recv_start(alloc) |server, nread, buf, src, flags, status| {
server.recv_stop();
rtdebug!("i'm reading!");
assert!(status.is_none());
assert_eq!(flags, 0);
assert_eq!(src, client_addr);
let buf = vec_from_uv_buf(buf);
let mut count = 0;
rtdebug!("got %d bytes", nread);
let buf = buf.unwrap();
for buf.slice(0, nread as uint).iter().advance() |&byte| {
assert!(byte == count as u8);
rtdebug!("%u", byte as uint);
count += 1;
}
assert_eq!(count, MAX);
server.close(||{});
}
do Thread::start {
let mut loop_ = Loop::new();
let client = UdpWatcher::new(&loop_);
assert!(client.bind(client_addr).is_ok());
let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let buf = slice_to_uv_buf(msg);
do client.send(buf, server_addr) |client, status| {
rtdebug!("writing");
assert!(status.is_none());
client.close(||{});
}
loop_.run();
loop_.close();
};
loop_.run();
loop_.close();
}
}
#[test]
fn udp_recv_ip6() {
do run_in_bare_thread() {
static MAX: int = 10;
let mut loop_ = Loop::new();
let server_addr = next_test_ip6();
let client_addr = next_test_ip6();
let server = UdpWatcher::new(&loop_);
assert!(server.bind(server_addr).is_ok());
rtdebug!("starting read");
let alloc: AllocCallback = |size| {
vec_to_uv_buf(vec::from_elem(size, 0))
};
do server.recv_start(alloc) |server, nread, buf, src, flags, status| {
server.recv_stop();
rtdebug!("i'm reading!");
assert!(status.is_none());
assert_eq!(flags, 0);
assert_eq!(src, client_addr);
let buf = vec_from_uv_buf(buf);
let mut count = 0;
rtdebug!("got %d bytes", nread);
let buf = buf.unwrap();
for buf.slice(0, nread as uint).iter().advance() |&byte| {
assert!(byte == count as u8);
rtdebug!("%u", byte as uint);
count += 1;
}
assert_eq!(count, MAX);
server.close(||{});
}
do Thread::start {
let mut loop_ = Loop::new();
let client = UdpWatcher::new(&loop_);
assert!(client.bind(client_addr).is_ok());
let msg = ~[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let buf = slice_to_uv_buf(msg);
do client.send(buf, server_addr) |client, status| {
rtdebug!("writing");
assert!(status.is_none());
client.close(||{});
}
loop_.run();
loop_.close();
};
loop_.run();
loop_.close();
}
}
}

View File

@ -63,9 +63,8 @@ impl EventLoop for UvEventLoop {
fn callback(&mut self, f: ~fn()) {
let mut idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
do idle_watcher.start |idle_watcher, status| {
do idle_watcher.start |mut idle_watcher, status| {
assert!(status.is_none());
let mut idle_watcher = idle_watcher;
idle_watcher.stop();
idle_watcher.close(||());
f();
@ -221,7 +220,7 @@ impl IoFactory for UvIoFactory {
rtdebug!("connect: in connect callback");
if status.is_none() {
rtdebug!("status is none");
let res = Ok(~UvTcpStream { watcher: stream_watcher });
let res = Ok(~UvTcpStream(stream_watcher));
// Store the stream in the task's stack
unsafe { (*result_cell_ptr).put_back(res); }
@ -263,6 +262,24 @@ impl IoFactory for UvIoFactory {
}
}
}
fn udp_bind(&mut self, addr: IpAddr) -> Result<~RtioUdpSocketObject, IoError> {
let /*mut*/ watcher = UdpWatcher::new(self.uv_loop());
match watcher.bind(addr) {
Ok(_) => Ok(~UvUdpSocket(watcher)),
Err(uverr) => {
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do watcher.close {
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
}
}
Err(uv_error_to_io_error(uverr))
}
}
}
}
// FIXME #6090: Prefer newtype structs but Drop doesn't work
@ -298,6 +315,11 @@ impl Drop for UvTcpListener {
}
}
impl RtioSocket for UvTcpListener {
// XXX implement
fn socket_name(&self) -> IpAddr { fail!(); }
}
impl RtioTcpListener for UvTcpListener {
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
@ -314,15 +336,14 @@ impl RtioTcpListener for UvTcpListener {
let incoming_streams_cell = Cell::new(incoming_streams_cell.take());
let mut server_tcp_watcher = server_tcp_watcher;
do server_tcp_watcher.listen |server_stream_watcher, status| {
do server_tcp_watcher.listen |mut server_stream_watcher, status| {
let maybe_stream = if status.is_none() {
let mut server_stream_watcher = server_stream_watcher;
let mut loop_ = server_stream_watcher.event_loop();
let client_tcp_watcher = TcpWatcher::new(&mut loop_);
let client_tcp_watcher = client_tcp_watcher.as_stream();
// XXX: Need's to be surfaced in interface
server_stream_watcher.accept(client_tcp_watcher);
Ok(~UvTcpStream { watcher: client_tcp_watcher })
Ok(~UvTcpStream(client_tcp_watcher))
} else {
Err(standard_error(OtherIoError))
};
@ -334,25 +355,22 @@ impl RtioTcpListener for UvTcpListener {
return self.incoming_streams.recv();
}
// XXX implement
fn accept_simultaneously(&self) { fail!(); }
fn dont_accept_simultaneously(&self) { fail!(); }
}
// FIXME #6090: Prefer newtype structs but Drop doesn't work
pub struct UvTcpStream {
watcher: StreamWatcher
}
impl UvTcpStream {
fn watcher(&self) -> StreamWatcher { self.watcher }
}
pub struct UvTcpStream(StreamWatcher);
impl Drop for UvTcpStream {
fn drop(&self) {
rtdebug!("closing tcp stream");
let watcher = self.watcher();
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do watcher.close {
do self.close {
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
}
@ -360,32 +378,35 @@ impl Drop for UvTcpStream {
}
}
impl RtioSocket for UvTcpStream {
// XXX implement
fn socket_name(&self) -> IpAddr { fail!(); }
}
impl RtioTcpStream for UvTcpStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
fn read(&self, buf: &mut [u8]) -> Result<uint, IoError> {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
let scheduler = Local::take::<Scheduler>();
assert!(scheduler.in_task_context());
let watcher = self.watcher();
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |sched, task| {
rtdebug!("read: entered scheduler context");
assert!(!sched.in_task_context());
let mut watcher = watcher;
let task_cell = Cell::new(task);
// XXX: We shouldn't reallocate these callbacks every
// call to read
let alloc: AllocCallback = |_| unsafe {
slice_to_uv_buf(*buf_ptr)
};
do watcher.read_start(alloc) |watcher, nread, _buf, status| {
let mut watcher = **self;
do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
// Stop reading so that no read callbacks are
// triggered before the user calls `read` again.
// XXX: Is there a performance impact to calling
// stop here?
let mut watcher = watcher;
watcher.read_stop();
let result = if status.is_none() {
@ -406,17 +427,16 @@ impl RtioTcpStream for UvTcpStream {
return result_cell.take();
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
fn write(&self, buf: &[u8]) -> Result<(), IoError> {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let scheduler = Local::take::<Scheduler>();
assert!(scheduler.in_task_context());
let watcher = self.watcher();
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let mut watcher = watcher;
let task_cell = Cell::new(task);
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
let mut watcher = **self;
do watcher.write(buf) |_watcher, status| {
let result = if status.is_none() {
Ok(())
@ -434,6 +454,112 @@ impl RtioTcpStream for UvTcpStream {
assert!(!result_cell.is_empty());
return result_cell.take();
}
// XXX implement
fn peer_name(&self) -> IpAddr { fail!(); }
fn control_congestion(&self) { fail!(); }
fn nodelay(&self) { fail!(); }
fn keepalive(&self, _delay_in_seconds: uint) { fail!(); }
fn letdie(&self) { fail!(); }
}
pub struct UvUdpSocket(UdpWatcher);
impl Drop for UvUdpSocket {
fn drop(&self) {
rtdebug!("closing udp socket");
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self.close {
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
}
}
}
}
impl RtioSocket for UvUdpSocket {
// XXX implement
fn socket_name(&self) -> IpAddr { fail!(); }
}
impl RtioUdpSocket for UvUdpSocket {
fn recvfrom(&self, buf: &mut [u8]) -> Result<(uint, IpAddr), IoError> {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(uint, IpAddr), IoError>> = &result_cell;
let scheduler = Local::take::<Scheduler>();
assert!(scheduler.in_task_context());
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |sched, task| {
rtdebug!("recvfrom: entered scheduler context");
assert!(!sched.in_task_context());
let task_cell = Cell::new(task);
let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
do self.recv_start(alloc) |watcher, nread, _buf, addr, flags, status| {
let _ = flags; // XXX add handling for partials?
watcher.recv_stop();
let result = match status {
None => {
assert!(nread >= 0);
Ok((nread as uint, addr))
}
Some(err) => Err(uv_error_to_io_error(err))
};
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
}
}
assert!(!result_cell.is_empty());
return result_cell.take();
}
fn sendto(&self, buf: &[u8], dst: IpAddr) -> Result<(), IoError> {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let scheduler = Local::take::<Scheduler>();
assert!(scheduler.in_task_context());
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
do self.send(buf, dst) |_watcher, status| {
let result = match status {
None => Ok(()),
Some(err) => Err(uv_error_to_io_error(err)),
};
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
}
}
assert!(!result_cell.is_empty());
return result_cell.take();
}
// XXX implement
fn join_multicast(&self, _multi: IpAddr) { fail!(); }
fn leave_multicast(&self, _multi: IpAddr) { fail!(); }
fn loop_multicast_locally(&self) { fail!(); }
fn dont_loop_multicast_locally(&self) { fail!(); }
fn multicast_time_to_live(&self, _ttl: int) { fail!(); }
fn time_to_live(&self, _ttl: int) { fail!(); }
fn hear_broadcasts(&self) { fail!(); }
fn ignore_broadcasts(&self) { fail!(); }
}
#[test]
@ -448,6 +574,18 @@ fn test_simple_io_no_connect() {
}
}
#[test]
fn test_simple_udp_io_bind_only() {
do run_in_newsched_task {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let addr = next_test_ip4();
let maybe_socket = (*io).udp_bind(addr);
assert!(maybe_socket.is_ok());
}
}
}
#[test]
fn test_simple_tcp_server_and_client() {
do run_in_newsched_task {
@ -458,7 +596,7 @@ fn test_simple_tcp_server_and_client() {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut listener = (*io).tcp_bind(addr).unwrap();
let mut stream = listener.accept().unwrap();
let stream = listener.accept().unwrap();
let mut buf = [0, .. 2048];
let nread = stream.read(buf).unwrap();
assert_eq!(nread, 8);
@ -472,13 +610,44 @@ fn test_simple_tcp_server_and_client() {
do spawntask_immediately {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut stream = (*io).tcp_connect(addr).unwrap();
let stream = (*io).tcp_connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
}
}
}
}
#[test]
fn test_simple_udp_server_and_client() {
do run_in_newsched_task {
let server_addr = next_test_ip4();
let client_addr = next_test_ip4();
do spawntask_immediately {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let server_socket = (*io).udp_bind(server_addr).unwrap();
let mut buf = [0, .. 2048];
let (nread,src) = server_socket.recvfrom(buf).unwrap();
assert_eq!(nread, 8);
for uint::range(0, nread) |i| {
rtdebug!("%u", buf[i] as uint);
assert_eq!(buf[i], i as u8);
}
assert_eq!(src, client_addr);
}
}
do spawntask_immediately {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let client_socket = (*io).udp_bind(client_addr).unwrap();
client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr);
}
}
}
}
#[test] #[ignore(reason = "busted")]
fn test_read_and_block() {
do run_in_newsched_task {
@ -487,7 +656,7 @@ fn test_read_and_block() {
do spawntask_immediately {
let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
let mut stream = listener.accept().unwrap();
let stream = listener.accept().unwrap();
let mut buf = [0, .. 2048];
let expected = 32;
@ -520,7 +689,7 @@ fn test_read_and_block() {
do spawntask_immediately {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut stream = (*io).tcp_connect(addr).unwrap();
let stream = (*io).tcp_connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
@ -541,7 +710,7 @@ fn test_read_read_read() {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut listener = (*io).tcp_bind(addr).unwrap();
let mut stream = listener.accept().unwrap();
let stream = listener.accept().unwrap();
let buf = [1, .. 2048];
let mut total_bytes_written = 0;
while total_bytes_written < MAX {
@ -554,7 +723,7 @@ fn test_read_read_read() {
do spawntask_immediately {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut stream = (*io).tcp_connect(addr).unwrap();
let stream = (*io).tcp_connect(addr).unwrap();
let mut buf = [0, .. 2048];
let mut total_bytes_read = 0;
while total_bytes_read < MAX {
@ -570,3 +739,96 @@ fn test_read_read_read() {
}
}
}
#[test]
fn test_udp_twice() {
do run_in_newsched_task {
let server_addr = next_test_ip4();
let client_addr = next_test_ip4();
do spawntask_immediately {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let client = (*io).udp_bind(client_addr).unwrap();
assert!(client.sendto([1], server_addr).is_ok());
assert!(client.sendto([2], server_addr).is_ok());
}
}
do spawntask_immediately {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let server = (*io).udp_bind(server_addr).unwrap();
let mut buf1 = [0];
let mut buf2 = [0];
let (nread1, src1) = server.recvfrom(buf1).unwrap();
let (nread2, src2) = server.recvfrom(buf2).unwrap();
assert_eq!(nread1, 1);
assert_eq!(nread2, 1);
assert_eq!(src1, client_addr);
assert_eq!(src2, client_addr);
assert_eq!(buf1[0], 1);
assert_eq!(buf2[0], 2);
}
}
}
}
#[test]
fn test_udp_many_read() {
do run_in_newsched_task {
let server_out_addr = next_test_ip4();
let server_in_addr = next_test_ip4();
let client_out_addr = next_test_ip4();
let client_in_addr = next_test_ip4();
static MAX: uint = 500_000;
do spawntask_immediately {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let server_out = (*io).udp_bind(server_out_addr).unwrap();
let server_in = (*io).udp_bind(server_in_addr).unwrap();
let msg = [1, .. 2048];
let mut total_bytes_sent = 0;
let mut buf = [1];
while buf[0] == 1 {
// send more data
assert!(server_out.sendto(msg, client_in_addr).is_ok());
total_bytes_sent += msg.len();
// check if the client has received enough
let res = server_in.recvfrom(buf);
assert!(res.is_ok());
let (nread, src) = res.unwrap();
assert_eq!(nread, 1);
assert_eq!(src, client_out_addr);
}
assert!(total_bytes_sent >= MAX);
}
}
do spawntask_immediately {
unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
let client_out = (*io).udp_bind(client_out_addr).unwrap();
let client_in = (*io).udp_bind(client_in_addr).unwrap();
let mut total_bytes_recv = 0;
let mut buf = [0, .. 2048];
while total_bytes_recv < MAX {
// ask for more
assert!(client_out.sendto([1], server_in_addr).is_ok());
// wait for data
let res = client_in.recvfrom(buf);
assert!(res.is_ok());
let (nread, src) = res.unwrap();
assert_eq!(src, server_out_addr);
total_bytes_recv += nread;
for uint::range(0, nread) |i| {
assert_eq!(buf[i], 1);
}
}
// tell the server we're done
assert!(client_out.sendto([0], server_in_addr).is_ok());
}
}
}
}

View File

@ -60,17 +60,24 @@ pub type uv_handle_t = c_void;
pub type uv_loop_t = c_void;
pub type uv_idle_t = c_void;
pub type uv_tcp_t = c_void;
pub type uv_udp_t = c_void;
pub type uv_connect_t = c_void;
pub type uv_write_t = c_void;
pub type uv_async_t = c_void;
pub type uv_timer_t = c_void;
pub type uv_stream_t = c_void;
pub type uv_fs_t = c_void;
pub type uv_udp_send_t = c_void;
pub type uv_idle_cb = *u8;
pub type uv_alloc_cb = *u8;
pub type uv_udp_send_cb = *u8;
pub type uv_udp_recv_cb = *u8;
pub type sockaddr = c_void;
pub type sockaddr_in = c_void;
pub type sockaddr_in6 = c_void;
pub type uv_membership = c_void;
#[deriving(Eq)]
pub enum uv_handle_type {
@ -187,31 +194,88 @@ pub unsafe fn idle_stop(handle: *uv_idle_t) -> c_int {
rust_uv_idle_stop(handle)
}
pub unsafe fn udp_init(loop_handle: *uv_loop_t, handle: *uv_udp_t) -> c_int {
return rust_uv_udp_init(loop_handle, handle);
}
pub unsafe fn udp_bind(server: *uv_udp_t, addr: *sockaddr_in, flags: c_uint) -> c_int {
return rust_uv_udp_bind(server, addr, flags);
}
pub unsafe fn udp_bind6(server: *uv_udp_t, addr: *sockaddr_in6, flags: c_uint) -> c_int {
return rust_uv_udp_bind6(server, addr, flags);
}
pub unsafe fn udp_send<T>(req: *uv_udp_send_t, handle: *T, buf_in: &[uv_buf_t],
addr: *sockaddr_in, cb: uv_udp_send_cb) -> c_int {
let buf_ptr = vec::raw::to_ptr(buf_in);
let buf_cnt = buf_in.len() as i32;
return rust_uv_udp_send(req, handle as *c_void, buf_ptr, buf_cnt, addr, cb);
}
pub unsafe fn udp_send6<T>(req: *uv_udp_send_t, handle: *T, buf_in: &[uv_buf_t],
addr: *sockaddr_in6, cb: uv_udp_send_cb) -> c_int {
let buf_ptr = vec::raw::to_ptr(buf_in);
let buf_cnt = buf_in.len() as i32;
return rust_uv_udp_send6(req, handle as *c_void, buf_ptr, buf_cnt, addr, cb);
}
pub unsafe fn udp_recv_start(server: *uv_udp_t, on_alloc: uv_alloc_cb,
on_recv: uv_udp_recv_cb) -> c_int {
return rust_uv_udp_recv_start(server, on_alloc, on_recv);
}
pub unsafe fn udp_recv_stop(server: *uv_udp_t) -> c_int {
return rust_uv_udp_recv_stop(server);
}
pub unsafe fn get_udp_handle_from_send_req(send_req: *uv_udp_send_t) -> *uv_udp_t {
return rust_uv_get_udp_handle_from_send_req(send_req);
}
pub unsafe fn udp_get_sockname(handle: *uv_udp_t, name: *sockaddr_in) -> c_int {
return rust_uv_udp_getsockname(handle, name);
}
pub unsafe fn udp_get_sockname6(handle: *uv_udp_t, name: *sockaddr_in6) -> c_int {
return rust_uv_udp_getsockname6(handle, name);
}
pub unsafe fn udp_set_membership(handle: *uv_udp_t, multicast_addr: *c_char,
interface_addr: *c_char, membership: uv_membership) -> c_int {
return rust_uv_udp_set_membership(handle, multicast_addr, interface_addr, membership);
}
pub unsafe fn udp_set_multicast_loop(handle: *uv_udp_t, on: c_int) -> c_int {
return rust_uv_udp_set_multicast_loop(handle, on);
}
pub unsafe fn udp_set_multicast_ttl(handle: *uv_udp_t, ttl: c_int) -> c_int {
return rust_uv_udp_set_multicast_ttl(handle, ttl);
}
pub unsafe fn udp_set_broadcast(handle: *uv_udp_t, on: c_int) -> c_int {
return rust_uv_udp_set_broadcast(handle, on);
}
pub unsafe fn tcp_init(loop_handle: *c_void, handle: *uv_tcp_t) -> c_int {
return rust_uv_tcp_init(loop_handle, handle);
}
// FIXME ref #2064
pub unsafe fn tcp_connect(connect_ptr: *uv_connect_t,
tcp_handle_ptr: *uv_tcp_t,
addr_ptr: *sockaddr_in,
after_connect_cb: *u8) -> c_int {
return rust_uv_tcp_connect(connect_ptr, tcp_handle_ptr,
after_connect_cb, addr_ptr);
pub unsafe fn tcp_connect(connect_ptr: *uv_connect_t, tcp_handle_ptr: *uv_tcp_t,
addr_ptr: *sockaddr_in, after_connect_cb: *u8) -> c_int {
return rust_uv_tcp_connect(connect_ptr, tcp_handle_ptr, after_connect_cb, addr_ptr);
}
// FIXME ref #2064
pub unsafe fn tcp_connect6(connect_ptr: *uv_connect_t,
tcp_handle_ptr: *uv_tcp_t,
addr_ptr: *sockaddr_in6,
after_connect_cb: *u8) -> c_int {
return rust_uv_tcp_connect6(connect_ptr, tcp_handle_ptr,
after_connect_cb, addr_ptr);
pub unsafe fn tcp_connect6(connect_ptr: *uv_connect_t, tcp_handle_ptr: *uv_tcp_t,
addr_ptr: *sockaddr_in6, after_connect_cb: *u8) -> c_int {
return rust_uv_tcp_connect6(connect_ptr, tcp_handle_ptr, after_connect_cb, addr_ptr);
}
// FIXME ref #2064
pub unsafe fn tcp_bind(tcp_server_ptr: *uv_tcp_t, addr_ptr: *sockaddr_in) -> c_int {
return rust_uv_tcp_bind(tcp_server_ptr, addr_ptr);
}
// FIXME ref #2064
pub unsafe fn tcp_bind6(tcp_server_ptr: *uv_tcp_t, addr_ptr: *sockaddr_in6) -> c_int {
return rust_uv_tcp_bind6(tcp_server_ptr, addr_ptr);
}
@ -224,6 +288,26 @@ pub unsafe fn tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in6) -
return rust_uv_tcp_getpeername6(tcp_handle_ptr, name);
}
pub unsafe fn tcp_getsockname(handle: *uv_tcp_t, name: *sockaddr_in) -> c_int {
return rust_uv_tcp_getsockname(handle, name);
}
pub unsafe fn tcp_getsockname6(handle: *uv_tcp_t, name: *sockaddr_in6) -> c_int {
return rust_uv_tcp_getsockname6(handle, name);
}
pub unsafe fn tcp_nodelay(handle: *uv_tcp_t, enable: c_int) -> c_int {
return rust_uv_tcp_nodelay(handle, enable);
}
pub unsafe fn tcp_keepalive(handle: *uv_tcp_t, enable: c_int, delay: c_uint) -> c_int {
return rust_uv_tcp_keepalive(handle, enable, delay);
}
pub unsafe fn tcp_simultaneous_accepts(handle: *uv_tcp_t, enable: c_int) -> c_int {
return rust_uv_tcp_simultaneous_accepts(handle, enable);
}
pub unsafe fn listen<T>(stream: *T, backlog: c_int, cb: *u8) -> c_int {
return rust_uv_listen(stream as *c_void, backlog, cb);
}
@ -237,7 +321,7 @@ pub unsafe fn write<T>(req: *uv_write_t, stream: *T, buf_in: &[uv_buf_t], cb: *u
let buf_cnt = buf_in.len() as i32;
return rust_uv_write(req as *c_void, stream as *c_void, buf_ptr, buf_cnt, cb);
}
pub unsafe fn read_start(stream: *uv_stream_t, on_alloc: *u8, on_read: *u8) -> c_int {
pub unsafe fn read_start(stream: *uv_stream_t, on_alloc: uv_alloc_cb, on_read: *u8) -> c_int {
return rust_uv_read_start(stream as *c_void, on_alloc, on_read);
}
@ -281,6 +365,22 @@ pub unsafe fn timer_stop(timer_ptr: *uv_timer_t) -> c_int {
return rust_uv_timer_stop(timer_ptr);
}
pub unsafe fn is_ip4_addr(addr: *sockaddr) -> bool {
match rust_uv_is_ipv4_sockaddr(addr) { 0 => false, _ => true }
}
pub unsafe fn is_ip6_addr(addr: *sockaddr) -> bool {
match rust_uv_is_ipv6_sockaddr(addr) { 0 => false, _ => true }
}
pub unsafe fn as_sockaddr_in(addr: *sockaddr) -> *sockaddr_in {
return rust_uv_sockaddr_as_sockaddr_in(addr);
}
pub unsafe fn as_sockaddr_in6(addr: *sockaddr) -> *sockaddr_in6 {
return rust_uv_sockaddr_as_sockaddr_in6(addr);
}
pub unsafe fn malloc_ip4_addr(ip: &str, port: int) -> *sockaddr_in {
do str::as_c_str(ip) |ip_buf| {
rust_uv_ip4_addrp(ip_buf as *u8, port as libc::c_int)
@ -300,6 +400,22 @@ pub unsafe fn free_ip6_addr(addr: *sockaddr_in6) {
rust_uv_free_ip6_addr(addr);
}
pub unsafe fn ip4_name(addr: *sockaddr_in, dst: *u8, size: size_t) -> c_int {
return rust_uv_ip4_name(addr, dst, size);
}
pub unsafe fn ip6_name(addr: *sockaddr_in6, dst: *u8, size: size_t) -> c_int {
return rust_uv_ip6_name(addr, dst, size);
}
pub unsafe fn ip4_port(addr: *sockaddr_in) -> c_uint {
return rust_uv_ip4_port(addr);
}
pub unsafe fn ip6_port(addr: *sockaddr_in6) -> c_uint {
return rust_uv_ip6_port(addr);
}
// data access helpers
pub unsafe fn get_loop_for_uv_handle<T>(handle: *T) -> *c_void {
return rust_uv_get_loop_for_uv_handle(handle as *c_void);
@ -384,16 +500,11 @@ extern {
fn rust_uv_idle_stop(handle: *uv_idle_t) -> c_int;
fn rust_uv_async_send(handle: *uv_async_t);
fn rust_uv_async_init(loop_handle: *c_void,
async_handle: *uv_async_t,
cb: *u8) -> c_int;
fn rust_uv_async_init(loop_handle: *c_void, async_handle: *uv_async_t, cb: *u8) -> c_int;
fn rust_uv_tcp_init(loop_handle: *c_void, handle_ptr: *uv_tcp_t) -> c_int;
// FIXME ref #2604 .. ?
fn rust_uv_buf_init(out_buf: *uv_buf_t, base: *u8, len: size_t);
fn rust_uv_last_error(loop_handle: *c_void) -> uv_err_t;
// FIXME ref #2064
fn rust_uv_strerror(err: *uv_err_t) -> *c_char;
// FIXME ref #2064
fn rust_uv_err_name(err: *uv_err_t) -> *c_char;
fn rust_uv_ip4_addrp(ip: *u8, port: c_int) -> *sockaddr_in;
fn rust_uv_ip6_addrp(ip: *u8, port: c_int) -> *sockaddr_in6;
@ -403,40 +514,51 @@ extern {
fn rust_uv_ip6_name(src: *sockaddr_in6, dst: *u8, size: size_t) -> c_int;
fn rust_uv_ip4_port(src: *sockaddr_in) -> c_uint;
fn rust_uv_ip6_port(src: *sockaddr_in6) -> c_uint;
// FIXME ref #2064
fn rust_uv_tcp_connect(connect_ptr: *uv_connect_t,
tcp_handle_ptr: *uv_tcp_t,
after_cb: *u8,
fn rust_uv_tcp_connect(req: *uv_connect_t, handle: *uv_tcp_t, cb: *u8,
addr: *sockaddr_in) -> c_int;
// FIXME ref #2064
fn rust_uv_tcp_bind(tcp_server: *uv_tcp_t, addr: *sockaddr_in) -> c_int;
// FIXME ref #2064
fn rust_uv_tcp_connect6(connect_ptr: *uv_connect_t,
tcp_handle_ptr: *uv_tcp_t,
after_cb: *u8,
fn rust_uv_tcp_connect6(req: *uv_connect_t, handle: *uv_tcp_t, cb: *u8,
addr: *sockaddr_in6) -> c_int;
// FIXME ref #2064
fn rust_uv_tcp_bind6(tcp_server: *uv_tcp_t, addr: *sockaddr_in6) -> c_int;
fn rust_uv_tcp_getpeername(tcp_handle_ptr: *uv_tcp_t,
name: *sockaddr_in) -> c_int;
fn rust_uv_tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t,
name: *sockaddr_in6) ->c_int;
fn rust_uv_tcp_getpeername(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in) -> c_int;
fn rust_uv_tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in6) ->c_int;
fn rust_uv_tcp_getsockname(handle: *uv_tcp_t, name: *sockaddr_in) -> c_int;
fn rust_uv_tcp_getsockname6(handle: *uv_tcp_t, name: *sockaddr_in6) -> c_int;
fn rust_uv_tcp_nodelay(handle: *uv_tcp_t, enable: c_int) -> c_int;
fn rust_uv_tcp_keepalive(handle: *uv_tcp_t, enable: c_int, delay: c_uint) -> c_int;
fn rust_uv_tcp_simultaneous_accepts(handle: *uv_tcp_t, enable: c_int) -> c_int;
fn rust_uv_udp_init(loop_handle: *uv_loop_t, handle_ptr: *uv_udp_t) -> c_int;
fn rust_uv_udp_bind(server: *uv_udp_t, addr: *sockaddr_in, flags: c_uint) -> c_int;
fn rust_uv_udp_bind6(server: *uv_udp_t, addr: *sockaddr_in6, flags: c_uint) -> c_int;
fn rust_uv_udp_send(req: *uv_udp_send_t, handle: *uv_udp_t, buf_in: *uv_buf_t,
buf_cnt: c_int, addr: *sockaddr_in, cb: *u8) -> c_int;
fn rust_uv_udp_send6(req: *uv_udp_send_t, handle: *uv_udp_t, buf_in: *uv_buf_t,
buf_cnt: c_int, addr: *sockaddr_in6, cb: *u8) -> c_int;
fn rust_uv_udp_recv_start(server: *uv_udp_t, on_alloc: *u8, on_recv: *u8) -> c_int;
fn rust_uv_udp_recv_stop(server: *uv_udp_t) -> c_int;
fn rust_uv_get_udp_handle_from_send_req(req: *uv_udp_send_t) -> *uv_udp_t;
fn rust_uv_udp_getsockname(handle: *uv_udp_t, name: *sockaddr_in) -> c_int;
fn rust_uv_udp_getsockname6(handle: *uv_udp_t, name: *sockaddr_in6) -> c_int;
fn rust_uv_udp_set_membership(handle: *uv_udp_t, multicast_addr: *c_char,
interface_addr: *c_char, membership: uv_membership) -> c_int;
fn rust_uv_udp_set_multicast_loop(handle: *uv_udp_t, on: c_int) -> c_int;
fn rust_uv_udp_set_multicast_ttl(handle: *uv_udp_t, ttl: c_int) -> c_int;
fn rust_uv_udp_set_broadcast(handle: *uv_udp_t, on: c_int) -> c_int;
fn rust_uv_is_ipv4_sockaddr(addr: *sockaddr) -> c_int;
fn rust_uv_is_ipv6_sockaddr(addr: *sockaddr) -> c_int;
fn rust_uv_sockaddr_as_sockaddr_in(addr: *sockaddr) -> *sockaddr_in;
fn rust_uv_sockaddr_as_sockaddr_in6(addr: *sockaddr) -> *sockaddr_in6;
fn rust_uv_listen(stream: *c_void, backlog: c_int, cb: *u8) -> c_int;
fn rust_uv_accept(server: *c_void, client: *c_void) -> c_int;
fn rust_uv_write(req: *c_void,
stream: *c_void,
buf_in: *uv_buf_t,
buf_cnt: c_int,
fn rust_uv_write(req: *c_void, stream: *c_void, buf_in: *uv_buf_t, buf_cnt: c_int,
cb: *u8) -> c_int;
fn rust_uv_read_start(stream: *c_void,
on_alloc: *u8,
on_read: *u8) -> c_int;
fn rust_uv_read_start(stream: *c_void, on_alloc: *u8, on_read: *u8) -> c_int;
fn rust_uv_read_stop(stream: *c_void) -> c_int;
fn rust_uv_timer_init(loop_handle: *c_void,
timer_handle: *uv_timer_t) -> c_int;
fn rust_uv_timer_start(timer_handle: *uv_timer_t,
cb: *u8,
timeout: libc::uint64_t,
fn rust_uv_timer_init(loop_handle: *c_void, timer_handle: *uv_timer_t) -> c_int;
fn rust_uv_timer_start(timer_handle: *uv_timer_t, cb: *u8, timeout: libc::uint64_t,
repeat: libc::uint64_t) -> c_int;
fn rust_uv_timer_stop(handle: *uv_timer_t) -> c_int;

View File

@ -1,458 +0,0 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use option::*;
use result::*;
use super::io::net::ip::IpAddr;
use super::uv::*;
use super::rtio::*;
use ops::Drop;
use cell::Cell;
use cast::transmute;
use super::sched::{Scheduler, local_sched};
#[cfg(test)] use container::Container;
#[cfg(test)] use uint;
#[cfg(test)] use unstable::run_in_bare_thread;
#[cfg(test)] use super::test::*;
pub struct UvEventLoop {
uvio: UvIoFactory
}
impl UvEventLoop {
pub fn new() -> UvEventLoop {
UvEventLoop {
uvio: UvIoFactory(Loop::new())
}
}
/// A convenience constructor
pub fn new_scheduler() -> Scheduler {
Scheduler::new(~UvEventLoop::new())
}
}
impl Drop for UvEventLoop {
fn drop(&self) {
// XXX: Need mutable finalizer
let this = unsafe {
transmute::<&UvEventLoop, &mut UvEventLoop>(self)
};
this.uvio.uv_loop().close();
}
}
impl EventLoop for UvEventLoop {
fn run(&mut self) {
self.uvio.uv_loop().run();
}
fn callback(&mut self, f: ~fn()) {
let mut idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
do idle_watcher.start |idle_watcher, status| {
assert!(status.is_none());
let mut idle_watcher = idle_watcher;
idle_watcher.stop();
idle_watcher.close();
f();
}
}
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
Some(&mut self.uvio)
}
}
#[test]
fn test_callback_run_once() {
do run_in_bare_thread {
let mut event_loop = UvEventLoop::new();
let mut count = 0;
let count_ptr: *mut int = &mut count;
do event_loop.callback {
unsafe { *count_ptr += 1 }
}
event_loop.run();
assert!(count == 1);
}
}
pub struct UvIoFactory(Loop);
impl UvIoFactory {
pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop {
match self { &UvIoFactory(ref mut ptr) => ptr }
}
}
impl IoFactory for UvIoFactory {
// Connect to an address and return a new stream
// NB: This blocks the task waiting on the connection.
// It would probably be better to return a future
fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject> {
// Create a cell in the task to hold the result. We will fill
// the cell before resuming the task.
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
let scheduler = local_sched::take();
assert!(scheduler.in_task_context());
// Block this task and take ownership, switch to scheduler context
do scheduler.deschedule_running_task_and_then |task| {
rtdebug!("connect: entered scheduler context");
do local_sched::borrow |scheduler| {
assert!(!scheduler.in_task_context());
}
let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
let task_cell = Cell::new(task);
// Wait for a connection
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("connect: in connect callback");
let maybe_stream = if status.is_none() {
rtdebug!("status is none");
Some(~UvStream(stream_watcher))
} else {
rtdebug!("status is some");
stream_watcher.close(||());
None
};
// Store the stream in the task's stack
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
// Context switch
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
}
}
assert!(!result_cell.is_empty());
return result_cell.take();
}
fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject> {
let mut watcher = TcpWatcher::new(self.uv_loop());
watcher.bind(addr);
return Some(~UvTcpListener(watcher));
}
}
pub struct UvTcpListener(TcpWatcher);
impl UvTcpListener {
fn watcher(&self) -> TcpWatcher {
match self { &UvTcpListener(w) => w }
}
fn close(&self) {
// XXX: Need to wait until close finishes before returning
self.watcher().as_stream().close(||());
}
}
impl Drop for UvTcpListener {
fn drop(&self) {
// XXX: Again, this never gets called. Use .close() instead
//self.watcher().as_stream().close(||());
}
}
impl TcpListener for UvTcpListener {
fn listen(&mut self) -> Option<~StreamObject> {
rtdebug!("entering listen");
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
let server_tcp_watcher = self.watcher();
let scheduler = local_sched::take();
assert!(scheduler.in_task_context());
do scheduler.deschedule_running_task_and_then |task| {
let task_cell = Cell::new(task);
let mut server_tcp_watcher = server_tcp_watcher;
do server_tcp_watcher.listen |server_stream_watcher, status| {
let maybe_stream = if status.is_none() {
let mut server_stream_watcher = server_stream_watcher;
let mut loop_ = loop_from_watcher(&server_stream_watcher);
let client_tcp_watcher = TcpWatcher::new(&mut loop_).as_stream();
// XXX: Needs to be surfaced in interface
server_stream_watcher.accept(client_tcp_watcher);
Some(~UvStream::new(client_tcp_watcher))
} else {
None
};
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
rtdebug!("resuming task from listen");
// Context switch
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
}
}
assert!(!result_cell.is_empty());
return result_cell.take();
}
}
pub struct UvStream(StreamWatcher);
impl UvStream {
fn new(watcher: StreamWatcher) -> UvStream {
UvStream(watcher)
}
fn watcher(&self) -> StreamWatcher {
match self { &UvStream(w) => w }
}
// XXX: finalize isn't working for ~UvStream???
fn close(&self) {
// XXX: Need to wait until this finishes before returning
self.watcher().close(||());
}
}
impl Drop for UvStream {
fn drop(&self) {
rtdebug!("closing stream");
//self.watcher().close(||());
}
}
impl Stream for UvStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, ()> {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<uint, ()>> = &result_cell;
let scheduler = local_sched::take();
assert!(scheduler.in_task_context());
let watcher = self.watcher();
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |task| {
rtdebug!("read: entered scheduler context");
do local_sched::borrow |scheduler| {
assert!(!scheduler.in_task_context());
}
let mut watcher = watcher;
let task_cell = Cell::new(task);
// XXX: We shouldn't reallocate these callbacks every
// call to read
let alloc: AllocCallback = |_| unsafe {
slice_to_uv_buf(*buf_ptr)
};
do watcher.read_start(alloc) |watcher, nread, _buf, status| {
// Stop reading so that no read callbacks are
// triggered before the user calls `read` again.
// XXX: Is there a performance impact to calling
// stop here?
let mut watcher = watcher;
watcher.read_stop();
let result = if status.is_none() {
assert!(nread >= 0);
Ok(nread as uint)
} else {
Err(())
};
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
}
}
assert!(!result_cell.is_empty());
return result_cell.take();
}
fn write(&mut self, buf: &[u8]) -> Result<(), ()> {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), ()>> = &result_cell;
let scheduler = local_sched::take();
assert!(scheduler.in_task_context());
let watcher = self.watcher();
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |task| {
let mut watcher = watcher;
let task_cell = Cell::new(task);
let buf = unsafe { &*buf_ptr };
// XXX: OMGCOPIES
let buf = buf.to_vec();
do watcher.write(buf) |_watcher, status| {
let result = if status.is_none() {
Ok(())
} else {
Err(())
};
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
}
}
assert!(!result_cell.is_empty());
return result_cell.take();
}
}
#[test]
fn test_simple_io_no_connect() {
do run_in_newsched_task {
let io = unsafe { local_sched::unsafe_borrow_io() };
let addr = next_test_ip4();
let maybe_chan = io.connect(addr);
assert!(maybe_chan.is_none());
}
}
#[test]
fn test_simple_tcp_server_and_client() {
do run_in_newsched_task {
let addr = next_test_ip4();
// Start the server first so it's listening when we connect
do spawntask_immediately {
unsafe {
let io = local_sched::unsafe_borrow_io();
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let mut buf = [0, .. 2048];
let nread = stream.read(buf).unwrap();
assert!(nread == 8);
for uint::range(0, nread) |i| {
rtdebug!("%u", buf[i] as uint);
assert!(buf[i] == i as u8);
}
stream.close();
listener.close();
}
}
do spawntask_immediately {
unsafe {
let io = local_sched::unsafe_borrow_io();
let mut stream = io.connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.close();
}
}
}
}
#[test] #[ignore(reason = "busted")]
fn test_read_and_block() {
do run_in_newsched_task {
let addr = next_test_ip4();
do spawntask_immediately {
let io = unsafe { local_sched::unsafe_borrow_io() };
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let mut buf = [0, .. 2048];
let expected = 32;
let mut current = 0;
let mut reads = 0;
while current < expected {
let nread = stream.read(buf).unwrap();
for uint::range(0, nread) |i| {
let val = buf[i] as uint;
assert!(val == current % 8);
current += 1;
}
reads += 1;
let scheduler = local_sched::take();
// Yield to the other task in hopes that it
// will trigger a read callback while we are
// not ready for it
do scheduler.deschedule_running_task_and_then |task| {
let task = Cell::new(task);
do local_sched::borrow |scheduler| {
scheduler.task_queue.push_back(task.take());
}
}
}
// Make sure we had multiple reads
assert!(reads > 1);
stream.close();
listener.close();
}
do spawntask_immediately {
let io = unsafe { local_sched::unsafe_borrow_io() };
let mut stream = io.connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.close();
}
}
}
#[test]
fn test_read_read_read() {
do run_in_newsched_task {
let addr = next_test_ip4();
static MAX: uint = 500000;
do spawntask_immediately {
unsafe {
let io = local_sched::unsafe_borrow_io();
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let buf = [1, .. 2048];
let mut total_bytes_written = 0;
while total_bytes_written < MAX {
stream.write(buf);
total_bytes_written += buf.len();
}
stream.close();
listener.close();
}
}
do spawntask_immediately {
let io = unsafe { local_sched::unsafe_borrow_io() };
let mut stream = io.connect(addr).unwrap();
let mut buf = [0, .. 2048];
let mut total_bytes_read = 0;
while total_bytes_read < MAX {
let nread = stream.read(buf).unwrap();
rtdebug!("read %u bytes", nread as uint);
total_bytes_read += nread;
for uint::range(0, nread) |i| {
assert!(buf[i] == 1);
}
}
rtdebug!("read %u bytes total", total_bytes_read as uint);
stream.close();
}
}
}

View File

@ -293,6 +293,118 @@ rust_uv_tcp_getpeername6
return uv_tcp_getpeername(handle, (sockaddr*)name, &namelen);
}
extern "C" int
rust_uv_tcp_getsockname
(uv_tcp_t* handle, sockaddr_in* name) {
int namelen = sizeof(sockaddr_in);
return uv_tcp_getsockname(handle, (sockaddr*)name, &namelen);
}
extern "C" int
rust_uv_tcp_getsockname6
(uv_tcp_t* handle, sockaddr_in6* name) {
int namelen = sizeof(sockaddr_in6);
return uv_tcp_getsockname(handle, (sockaddr*)name, &namelen);
}
extern "C" int
rust_uv_tcp_nodelay
(uv_tcp_t* handle, int enable) {
return uv_tcp_nodelay(handle, enable);
}
extern "C" int
rust_uv_tcp_keepalive
(uv_tcp_t* handle, int enable, unsigned int delay) {
return uv_tcp_keepalive(handle, enable, delay);
}
extern "C" int
rust_uv_tcp_simultaneous_accepts
(uv_tcp_t* handle, int enable) {
return uv_tcp_simultaneous_accepts(handle, enable);
}
extern "C" int
rust_uv_udp_init(uv_loop_t* loop, uv_udp_t* handle) {
return uv_udp_init(loop, handle);
}
extern "C" int
rust_uv_udp_bind(uv_udp_t* server, sockaddr_in* addr_ptr, unsigned flags) {
return uv_udp_bind(server, *addr_ptr, flags);
}
extern "C" int
rust_uv_udp_bind6(uv_udp_t* server, sockaddr_in6* addr_ptr, unsigned flags) {
return uv_udp_bind6(server, *addr_ptr, flags);
}
extern "C" int
rust_uv_udp_send(uv_udp_send_t* req, uv_udp_t* handle, uv_buf_t* buf_in,
int buf_cnt, sockaddr_in* addr_ptr, uv_udp_send_cb cb) {
return uv_udp_send(req, handle, buf_in, buf_cnt, *addr_ptr, cb);
}
extern "C" int
rust_uv_udp_send6(uv_udp_send_t* req, uv_udp_t* handle, uv_buf_t* buf_in,
int buf_cnt, sockaddr_in6* addr_ptr, uv_udp_send_cb cb) {
return uv_udp_send6(req, handle, buf_in, buf_cnt, *addr_ptr, cb);
}
extern "C" int
rust_uv_udp_recv_start(uv_udp_t* server, uv_alloc_cb on_alloc, uv_udp_recv_cb on_read) {
return uv_udp_recv_start(server, on_alloc, on_read);
}
extern "C" int
rust_uv_udp_recv_stop(uv_udp_t* server) {
return uv_udp_recv_stop(server);
}
extern "C" uv_udp_t*
rust_uv_get_udp_handle_from_send_req(uv_udp_send_t* send_req) {
return send_req->handle;
}
extern "C" int
rust_uv_udp_getsockname
(uv_udp_t* handle, sockaddr_in* name) {
int namelen = sizeof(sockaddr_in);
return uv_udp_getsockname(handle, (sockaddr*)name, &namelen);
}
extern "C" int
rust_uv_udp_getsockname6
(uv_udp_t* handle, sockaddr_in6* name) {
int namelen = sizeof(sockaddr_in6);
return uv_udp_getsockname(handle, (sockaddr*)name, &namelen);
}
extern "C" int
rust_uv_udp_set_membership
(uv_udp_t* handle, const char* m_addr, const char* i_addr, uv_membership membership) {
return uv_udp_set_membership(handle, m_addr, i_addr, membership);
}
extern "C" int
rust_uv_udp_set_multicast_loop
(uv_udp_t* handle, int on) {
return uv_udp_set_multicast_loop(handle, on);
}
extern "C" int
rust_uv_udp_set_multicast_ttl
(uv_udp_t* handle, int ttl) {
return uv_udp_set_multicast_ttl(handle, ttl);
}
extern "C" int
rust_uv_udp_set_broadcast
(uv_udp_t* handle, int on) {
return uv_udp_set_broadcast(handle, on);
}
extern "C" int
rust_uv_listen(uv_stream_t* stream, int backlog,
uv_connection_cb cb) {
@ -545,10 +657,34 @@ extern "C" void
rust_uv_freeaddrinfo(addrinfo* res) {
uv_freeaddrinfo(res);
}
extern "C" int
rust_uv_is_ipv4_sockaddr(sockaddr* addr) {
return addr->sa_family == AF_INET;
}
extern "C" int
rust_uv_is_ipv6_sockaddr(sockaddr* addr) {
return addr->sa_family == AF_INET6;
}
extern "C" sockaddr_in*
rust_uv_sockaddr_as_sockaddr_in(sockaddr* addr) {
// return (sockaddr_in*)addr->sa_data;
return (sockaddr_in*)addr;
}
extern "C" sockaddr_in6*
rust_uv_sockaddr_as_sockaddr_in6(sockaddr* addr) {
//return (sockaddr_in6*)addr->sa_data;
return (sockaddr_in6*)addr;
}
extern "C" bool
rust_uv_is_ipv4_addrinfo(addrinfo* input) {
return input->ai_family == AF_INET;
}
extern "C" bool
rust_uv_is_ipv6_addrinfo(addrinfo* input) {
return input->ai_family == AF_INET6;

View File

@ -106,6 +106,29 @@ rust_uv_tcp_connect
rust_uv_tcp_bind
rust_uv_tcp_connect6
rust_uv_tcp_bind6
rust_uv_tcp_getsockname
rust_uv_tcp_getsockname6
rust_uv_tcp_nodelay
rust_uv_tcp_keepalive
rust_uv_tcp_simultaneous_accepts
rust_uv_udp_init
rust_uv_udp_bind
rust_uv_udp_bind6
rust_uv_udp_send
rust_uv_udp_send6
rust_uv_udp_recv_start
rust_uv_udp_recv_stop
rust_uv_get_udp_handle_from_send_req
rust_uv_udp_getsockname
rust_uv_udp_getsockname6
rust_uv_udp_set_membership
rust_uv_udp_set_multicast_loop
rust_uv_udp_set_multicast_ttl
rust_uv_udp_set_broadcast
rust_uv_is_ipv4_sockaddr
rust_uv_is_ipv6_sockaddr
rust_uv_sockaddr_as_sockaddr_in
rust_uv_sockaddr_as_sockaddr_in6
rust_uv_listen
rust_uv_accept
rust_uv_write