rust/src/librustuv/net.rs

1316 lines
44 KiB
Rust
Raw Normal View History

// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use std::cast;
use std::io::{IoError, IoResult};
use std::io::net::ip;
2014-02-26 11:58:41 -06:00
use libc::{size_t, ssize_t, c_int, c_void, c_uint};
use libc;
use std::mem;
use std::ptr;
use std::rt::rtio;
use std::rt::task::BlockedTask;
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
use access::Access;
use homing::{HomingIO, HomeHandle};
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
use rc::Refcount;
2013-11-05 13:29:45 -06:00
use stream::StreamWatcher;
use super::{Loop, Request, UvError, Buf, status_to_io_result,
uv_error_to_io_error, UvHandle, slice_to_uv_buf,
wait_until_woken_after, wakeup};
use timer::TimerWatcher;
use uvio::UvIoFactory;
2013-11-05 13:29:45 -06:00
use uvll;
////////////////////////////////////////////////////////////////////////////////
/// Generic functions related to dealing with sockaddr things
////////////////////////////////////////////////////////////////////////////////
pub fn htons(u: u16) -> u16 { mem::to_be16(u) }
pub fn ntohs(u: u16) -> u16 { mem::from_be16(u) }
pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
len: uint) -> ip::SocketAddr {
match storage.ss_family as c_int {
libc::AF_INET => {
assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
let storage: &libc::sockaddr_in = unsafe {
cast::transmute(storage)
};
let addr = storage.sin_addr.s_addr as u32;
let a = (addr >> 0) as u8;
let b = (addr >> 8) as u8;
let c = (addr >> 16) as u8;
let d = (addr >> 24) as u8;
ip::SocketAddr {
ip: ip::Ipv4Addr(a, b, c, d),
port: ntohs(storage.sin_port),
}
}
libc::AF_INET6 => {
assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
let storage: &libc::sockaddr_in6 = unsafe {
cast::transmute(storage)
};
let a = ntohs(storage.sin6_addr.s6_addr[0]);
let b = ntohs(storage.sin6_addr.s6_addr[1]);
let c = ntohs(storage.sin6_addr.s6_addr[2]);
let d = ntohs(storage.sin6_addr.s6_addr[3]);
let e = ntohs(storage.sin6_addr.s6_addr[4]);
let f = ntohs(storage.sin6_addr.s6_addr[5]);
let g = ntohs(storage.sin6_addr.s6_addr[6]);
let h = ntohs(storage.sin6_addr.s6_addr[7]);
ip::SocketAddr {
ip: ip::Ipv6Addr(a, b, c, d, e, f, g, h),
port: ntohs(storage.sin6_port),
}
}
n => {
fail!("unknown family {}", n);
}
}
}
fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) {
unsafe {
let mut storage: libc::sockaddr_storage = mem::init();
let len = match addr.ip {
ip::Ipv4Addr(a, b, c, d) => {
let storage: &mut libc::sockaddr_in =
cast::transmute(&mut storage);
(*storage).sin_family = libc::AF_INET as libc::sa_family_t;
(*storage).sin_port = htons(addr.port);
(*storage).sin_addr = libc::in_addr {
s_addr: (d as u32 << 24) |
(c as u32 << 16) |
(b as u32 << 8) |
(a as u32 << 0)
};
mem::size_of::<libc::sockaddr_in>()
}
ip::Ipv6Addr(a, b, c, d, e, f, g, h) => {
let storage: &mut libc::sockaddr_in6 =
cast::transmute(&mut storage);
storage.sin6_family = libc::AF_INET6 as libc::sa_family_t;
storage.sin6_port = htons(addr.port);
storage.sin6_addr = libc::in6_addr {
s6_addr: [
htons(a),
htons(b),
htons(c),
htons(d),
htons(e),
htons(f),
htons(g),
htons(h),
]
};
mem::size_of::<libc::sockaddr_in6>()
}
};
return (storage, len);
}
}
enum SocketNameKind {
TcpPeer,
Tcp,
Udp
}
fn socket_name(sk: SocketNameKind,
handle: *c_void) -> Result<ip::SocketAddr, IoError> {
let getsockname = match sk {
TcpPeer => uvll::uv_tcp_getpeername,
Tcp => uvll::uv_tcp_getsockname,
Udp => uvll::uv_udp_getsockname,
};
// Allocate a sockaddr_storage since we don't know if it's ipv4 or ipv6
let mut sockaddr: libc::sockaddr_storage = unsafe { mem::init() };
let mut namelen = mem::size_of::<libc::sockaddr_storage>() as c_int;
let sockaddr_p = &mut sockaddr as *mut libc::sockaddr_storage;
match unsafe {
getsockname(handle, sockaddr_p as *mut libc::sockaddr, &mut namelen)
} {
0 => Ok(sockaddr_to_addr(&sockaddr, namelen as uint)),
n => Err(uv_error_to_io_error(UvError(n)))
}
}
////////////////////////////////////////////////////////////////////////////////
// Helpers for handling timeouts, shared for pipes/tcp
////////////////////////////////////////////////////////////////////////////////
pub struct ConnectCtx {
pub status: c_int,
pub task: Option<BlockedTask>,
pub timer: Option<~TimerWatcher>,
}
pub struct AcceptTimeout {
timer: Option<TimerWatcher>,
timeout_tx: Option<Sender<()>>,
timeout_rx: Option<Receiver<()>>,
}
impl ConnectCtx {
pub fn connect<T>(
mut self, obj: T, timeout: Option<u64>, io: &mut UvIoFactory,
f: |&Request, &T, uvll::uv_connect_cb| -> libc::c_int
) -> Result<T, UvError> {
let mut req = Request::new(uvll::UV_CONNECT);
let r = f(&req, &obj, connect_cb);
return match r {
0 => {
req.defuse(); // uv callback now owns this request
match timeout {
Some(t) => {
let mut timer = TimerWatcher::new(io);
timer.start(timer_cb, t, 0);
self.timer = Some(timer);
}
None => {}
}
wait_until_woken_after(&mut self.task, &io.loop_, || {
let data = &self as *_;
match self.timer {
Some(ref mut timer) => unsafe { timer.set_data(data) },
None => {}
}
req.set_data(data);
});
// Make sure an erroneously fired callback doesn't have access
// to the context any more.
req.set_data(0 as *int);
// If we failed because of a timeout, drop the TcpWatcher as
// soon as possible because it's data is now set to null and we
// want to cancel the callback ASAP.
match self.status {
0 => Ok(obj),
n => { drop(obj); Err(UvError(n)) }
}
}
n => Err(UvError(n))
};
extern fn timer_cb(handle: *uvll::uv_timer_t) {
// Don't close the corresponding tcp request, just wake up the task
// and let RAII take care of the pending watcher.
let cx: &mut ConnectCtx = unsafe {
&mut *(uvll::get_data_for_uv_handle(handle) as *mut ConnectCtx)
};
cx.status = uvll::ECANCELED;
wakeup(&mut cx.task);
}
extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
// This callback can be invoked with ECANCELED if the watcher is
// closed by the timeout callback. In that case we just want to free
// the request and be along our merry way.
2013-11-05 13:29:45 -06:00
let req = Request::wrap(req);
if status == uvll::ECANCELED { return }
// Apparently on windows when the handle is closed this callback may
// not be invoked with ECANCELED but rather another error code.
// Either ways, if the data is null, then our timeout has expired
// and there's nothing we can do.
let data = unsafe { uvll::get_data_for_req(req.handle) };
if data.is_null() { return }
let cx: &mut ConnectCtx = unsafe { &mut *(data as *mut ConnectCtx) };
cx.status = status;
match cx.timer {
Some(ref mut t) => t.stop(),
None => {}
}
// Note that the timer callback doesn't cancel the connect request
// (that's the job of uv_close()), so it's possible for this
// callback to get triggered after the timeout callback fires, but
// before the task wakes up. In that case, we did indeed
// successfully connect, but we don't need to wake someone up. We
// updated the status above (correctly so), and the task will pick
// up on this when it wakes up.
if cx.task.is_some() {
wakeup(&mut cx.task);
}
}
}
}
impl AcceptTimeout {
pub fn new() -> AcceptTimeout {
AcceptTimeout { timer: None, timeout_tx: None, timeout_rx: None }
}
pub fn accept<T: Send>(&mut self, c: &Receiver<IoResult<T>>) -> IoResult<T> {
match self.timeout_rx {
None => c.recv(),
Some(ref rx) => {
use std::comm::Select;
// Poll the incoming channel first (don't rely on the order of
// select just yet). If someone's pending then we should return
// them immediately.
match c.try_recv() {
Ok(data) => return data,
Err(..) => {}
}
// Use select to figure out which channel gets ready first. We
// do some custom handling of select to ensure that we never
// actually drain the timeout channel (we'll keep seeing the
// timeout message in the future).
let s = Select::new();
let mut timeout = s.handle(rx);
let mut data = s.handle(c);
unsafe {
timeout.add();
data.add();
}
if s.wait() == timeout.id() {
Err(uv_error_to_io_error(UvError(uvll::ECANCELED)))
} else {
c.recv()
}
}
}
}
pub fn clear(&mut self) {
// Clear any previous timeout by dropping the timer and transmission
// channels
drop((self.timer.take(),
self.timeout_tx.take(),
self.timeout_rx.take()))
}
pub fn set_timeout<U, T: UvHandle<U> + HomingIO>(
&mut self, ms: u64, t: &mut T
) {
// If we have a timeout, lazily initialize the timer which will be used
// to fire when the timeout runs out.
if self.timer.is_none() {
let _m = t.fire_homing_missile();
let loop_ = Loop::wrap(unsafe {
uvll::get_loop_for_uv_handle(t.uv_handle())
});
let mut timer = TimerWatcher::new_home(&loop_, t.home().clone());
unsafe {
timer.set_data(self as *mut _ as *AcceptTimeout);
}
self.timer = Some(timer);
}
// Once we've got a timer, stop any previous timeout, reset it for the
// current one, and install some new channels to send/receive data on
let timer = self.timer.get_mut_ref();
timer.stop();
timer.start(timer_cb, ms, 0);
let (tx, rx) = channel();
self.timeout_tx = Some(tx);
self.timeout_rx = Some(rx);
extern fn timer_cb(timer: *uvll::uv_timer_t) {
let acceptor: &mut AcceptTimeout = unsafe {
&mut *(uvll::get_data_for_uv_handle(timer) as *mut AcceptTimeout)
};
// This send can never fail because if this timer is active then the
// receiving channel is guaranteed to be alive
acceptor.timeout_tx.get_ref().send(());
}
}
}
////////////////////////////////////////////////////////////////////////////////
/// TCP implementation
////////////////////////////////////////////////////////////////////////////////
pub struct TcpWatcher {
handle: *uvll::uv_tcp_t,
stream: StreamWatcher,
home: HomeHandle,
refcount: Refcount,
// libuv can't support concurrent reads and concurrent writes of the same
// stream object, so we use these access guards in order to arbitrate among
// multiple concurrent reads and writes. Note that libuv *can* read and
// write simultaneously, it just can't read and read simultaneously.
read_access: Access,
write_access: Access,
}
pub struct TcpListener {
home: HomeHandle,
handle: *uvll::uv_pipe_t,
closing_task: Option<BlockedTask>,
outgoing: Sender<Result<~rtio::RtioTcpStream:Send, IoError>>,
incoming: Receiver<Result<~rtio::RtioTcpStream:Send, IoError>>,
}
pub struct TcpAcceptor {
listener: ~TcpListener,
timeout: AcceptTimeout,
}
// TCP watchers (clients/streams)
impl TcpWatcher {
pub fn new(io: &mut UvIoFactory) -> TcpWatcher {
let handle = io.make_handle();
TcpWatcher::new_home(&io.loop_, handle)
}
fn new_home(loop_: &Loop, home: HomeHandle) -> TcpWatcher {
let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
assert_eq!(unsafe {
uvll::uv_tcp_init(loop_.handle, handle)
}, 0);
TcpWatcher {
home: home,
handle: handle,
stream: StreamWatcher::new(handle),
refcount: Refcount::new(),
read_access: Access::new(),
write_access: Access::new(),
}
}
pub fn connect(io: &mut UvIoFactory,
address: ip::SocketAddr,
timeout: Option<u64>) -> Result<TcpWatcher, UvError> {
let tcp = TcpWatcher::new(io);
let cx = ConnectCtx { status: -1, task: None, timer: None };
let (addr, _len) = addr_to_sockaddr(address);
let addr_p = &addr as *_ as *libc::sockaddr;
cx.connect(tcp, timeout, io, |req, tcp, cb| {
unsafe { uvll::uv_tcp_connect(req.handle, tcp.handle, addr_p, cb) }
})
}
}
impl HomingIO for TcpWatcher {
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
}
impl rtio::RtioSocket for TcpWatcher {
fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
let _m = self.fire_homing_missile();
socket_name(Tcp, self.handle)
}
}
impl rtio::RtioTcpStream for TcpWatcher {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
let m = self.fire_homing_missile();
let _g = self.read_access.grant(m);
self.stream.read(buf).map_err(uv_error_to_io_error)
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
let m = self.fire_homing_missile();
let _g = self.write_access.grant(m);
self.stream.write(buf).map_err(uv_error_to_io_error)
}
fn peer_name(&mut self) -> Result<ip::SocketAddr, IoError> {
let _m = self.fire_homing_missile();
socket_name(TcpPeer, self.handle)
}
fn control_congestion(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_tcp_nodelay(self.handle, 0 as c_int)
})
}
fn nodelay(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_tcp_nodelay(self.handle, 1 as c_int)
})
}
fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_tcp_keepalive(self.handle, 1 as c_int,
delay_in_seconds as c_uint)
})
}
fn letdie(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_tcp_keepalive(self.handle, 0 as c_int, 0 as c_uint)
})
}
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
fn clone(&self) -> ~rtio::RtioTcpStream:Send {
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
~TcpWatcher {
handle: self.handle,
stream: StreamWatcher::new(self.handle),
home: self.home.clone(),
refcount: self.refcount.clone(),
write_access: self.write_access.clone(),
read_access: self.read_access.clone(),
} as ~rtio::RtioTcpStream:Send
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
}
fn close_write(&mut self) -> Result<(), IoError> {
struct Ctx {
slot: Option<BlockedTask>,
status: c_int,
}
let mut req = Request::new(uvll::UV_SHUTDOWN);
return match unsafe {
uvll::uv_shutdown(req.handle, self.handle, shutdown_cb)
} {
0 => {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { slot: None, status: 0 };
wait_until_woken_after(&mut cx.slot, &self.uv_loop(), || {
req.set_data(&cx);
});
status_to_io_result(cx.status)
}
n => Err(uv_error_to_io_error(UvError(n)))
};
extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
let req = Request::wrap(req);
assert!(status != uvll::ECANCELED);
let cx: &mut Ctx = unsafe { req.get_data() };
cx.status = status;
wakeup(&mut cx.slot);
}
}
}
impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
fn uv_handle(&self) -> *uvll::uv_tcp_t { self.stream.handle }
}
impl Drop for TcpWatcher {
fn drop(&mut self) {
let _m = self.fire_homing_missile();
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
if self.refcount.decrement() {
self.close();
}
}
}
// TCP listeners (unbound servers)
impl TcpListener {
pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
-> Result<~TcpListener, UvError> {
let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
assert_eq!(unsafe {
uvll::uv_tcp_init(io.uv_loop(), handle)
}, 0);
let (tx, rx) = channel();
let l = ~TcpListener {
home: io.make_handle(),
handle: handle,
closing_task: None,
outgoing: tx,
incoming: rx,
};
let (addr, _len) = addr_to_sockaddr(address);
let res = unsafe {
let addr_p = &addr as *libc::sockaddr_storage;
uvll::uv_tcp_bind(l.handle, addr_p as *libc::sockaddr)
};
return match res {
0 => Ok(l.install()),
n => Err(UvError(n))
};
}
}
impl HomingIO for TcpListener {
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
}
impl UvHandle<uvll::uv_tcp_t> for TcpListener {
fn uv_handle(&self) -> *uvll::uv_tcp_t { self.handle }
}
impl rtio::RtioSocket for TcpListener {
fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
let _m = self.fire_homing_missile();
socket_name(Tcp, self.handle)
}
}
impl rtio::RtioTcpListener for TcpListener {
fn listen(~self) -> Result<~rtio::RtioTcpAcceptor:Send, IoError> {
// create the acceptor object from ourselves
let mut acceptor = ~TcpAcceptor {
listener: self,
timeout: AcceptTimeout::new(),
};
let _m = acceptor.fire_homing_missile();
// FIXME: the 128 backlog should be configurable
match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
0 => Ok(acceptor as ~rtio::RtioTcpAcceptor:Send),
n => Err(uv_error_to_io_error(UvError(n))),
}
}
}
extern fn listen_cb(server: *uvll::uv_stream_t, status: c_int) {
assert!(status != uvll::ECANCELED);
let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
let msg = match status {
0 => {
2013-11-05 13:29:45 -06:00
let loop_ = Loop::wrap(unsafe {
uvll::get_loop_for_uv_handle(server)
});
let client = TcpWatcher::new_home(&loop_, tcp.home().clone());
assert_eq!(unsafe { uvll::uv_accept(server, client.handle) }, 0);
Ok(~client as ~rtio::RtioTcpStream:Send)
}
n => Err(uv_error_to_io_error(UvError(n)))
};
tcp.outgoing.send(msg);
}
impl Drop for TcpListener {
fn drop(&mut self) {
let _m = self.fire_homing_missile();
self.close();
}
}
// TCP acceptors (bound servers)
impl HomingIO for TcpAcceptor {
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
}
impl rtio::RtioSocket for TcpAcceptor {
fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
let _m = self.fire_homing_missile();
socket_name(Tcp, self.listener.handle)
}
}
impl rtio::RtioTcpAcceptor for TcpAcceptor {
fn accept(&mut self) -> Result<~rtio::RtioTcpStream:Send, IoError> {
self.timeout.accept(&self.listener.incoming)
}
fn accept_simultaneously(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
})
}
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
})
}
fn set_timeout(&mut self, ms: Option<u64>) {
match ms {
None => self.timeout.clear(),
Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
}
}
}
////////////////////////////////////////////////////////////////////////////////
/// UDP implementation
////////////////////////////////////////////////////////////////////////////////
pub struct UdpWatcher {
handle: *uvll::uv_udp_t,
home: HomeHandle,
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
// See above for what these fields are
refcount: Refcount,
read_access: Access,
write_access: Access,
}
impl UdpWatcher {
pub fn bind(io: &mut UvIoFactory, address: ip::SocketAddr)
-> Result<UdpWatcher, UvError> {
let udp = UdpWatcher {
handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
home: io.make_handle(),
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
refcount: Refcount::new(),
read_access: Access::new(),
write_access: Access::new(),
};
assert_eq!(unsafe {
uvll::uv_udp_init(io.uv_loop(), udp.handle)
}, 0);
let (addr, _len) = addr_to_sockaddr(address);
let result = unsafe {
let addr_p = &addr as *libc::sockaddr_storage;
uvll::uv_udp_bind(udp.handle, addr_p as *libc::sockaddr, 0u32)
};
return match result {
0 => Ok(udp),
n => Err(UvError(n)),
};
}
}
impl UvHandle<uvll::uv_udp_t> for UdpWatcher {
fn uv_handle(&self) -> *uvll::uv_udp_t { self.handle }
}
impl HomingIO for UdpWatcher {
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
}
impl rtio::RtioSocket for UdpWatcher {
fn socket_name(&mut self) -> Result<ip::SocketAddr, IoError> {
let _m = self.fire_homing_missile();
socket_name(Udp, self.handle)
}
}
impl rtio::RtioUdpSocket for UdpWatcher {
fn recvfrom(&mut self, buf: &mut [u8])
-> Result<(uint, ip::SocketAddr), IoError>
{
struct Ctx {
task: Option<BlockedTask>,
buf: Option<Buf>,
result: Option<(ssize_t, Option<ip::SocketAddr>)>,
}
let loop_ = self.uv_loop();
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
let m = self.fire_homing_missile();
let _g = self.read_access.grant(m);
let a = match unsafe {
uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
} {
0 => {
let mut cx = Ctx {
task: None,
buf: Some(slice_to_uv_buf(buf)),
result: None,
};
let handle = self.handle;
wait_until_woken_after(&mut cx.task, &loop_, || {
unsafe { uvll::set_data_for_uv_handle(handle, &cx) }
});
match cx.result.take_unwrap() {
(n, _) if n < 0 =>
Err(uv_error_to_io_error(UvError(n as c_int))),
(n, addr) => Ok((n as uint, addr.unwrap()))
}
}
n => Err(uv_error_to_io_error(UvError(n)))
};
return a;
extern fn alloc_cb(handle: *uvll::uv_udp_t,
_suggested_size: size_t,
buf: *mut Buf) {
unsafe {
let cx: &mut Ctx =
cast::transmute(uvll::get_data_for_uv_handle(handle));
*buf = cx.buf.take().expect("recv alloc_cb called more than once")
}
}
extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf,
addr: *libc::sockaddr, _flags: c_uint) {
assert!(nread != uvll::ECANCELED as ssize_t);
let cx: &mut Ctx = unsafe {
cast::transmute(uvll::get_data_for_uv_handle(handle))
};
// When there's no data to read the recv callback can be a no-op.
// This can happen if read returns EAGAIN/EWOULDBLOCK. By ignoring
// this we just drop back to kqueue and wait for the next callback.
if nread == 0 {
cx.buf = Some(unsafe { *buf });
return
}
unsafe {
assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
}
let cx: &mut Ctx = unsafe {
cast::transmute(uvll::get_data_for_uv_handle(handle))
};
let addr = if addr == ptr::null() {
None
} else {
let len = mem::size_of::<libc::sockaddr_storage>();
Some(sockaddr_to_addr(unsafe { cast::transmute(addr) }, len))
};
cx.result = Some((nread, addr));
wakeup(&mut cx.task);
}
}
fn sendto(&mut self, buf: &[u8], dst: ip::SocketAddr) -> Result<(), IoError> {
struct Ctx { task: Option<BlockedTask>, result: c_int }
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
let m = self.fire_homing_missile();
let loop_ = self.uv_loop();
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
let _g = self.write_access.grant(m);
let mut req = Request::new(uvll::UV_UDP_SEND);
let buf = slice_to_uv_buf(buf);
let (addr, _len) = addr_to_sockaddr(dst);
let result = unsafe {
let addr_p = &addr as *libc::sockaddr_storage;
uvll::uv_udp_send(req.handle, self.handle, [buf],
addr_p as *libc::sockaddr, send_cb)
};
return match result {
0 => {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { task: None, result: 0 };
wait_until_woken_after(&mut cx.task, &loop_, || {
req.set_data(&cx);
});
match cx.result {
0 => Ok(()),
n => Err(uv_error_to_io_error(UvError(n)))
}
}
n => Err(uv_error_to_io_error(UvError(n)))
};
extern fn send_cb(req: *uvll::uv_udp_send_t, status: c_int) {
let req = Request::wrap(req);
assert!(status != uvll::ECANCELED);
let cx: &mut Ctx = unsafe { req.get_data() };
cx.result = status;
wakeup(&mut cx.task);
}
}
fn join_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
multi.to_str().with_c_str(|m_addr| {
uvll::uv_udp_set_membership(self.handle,
m_addr, ptr::null(),
uvll::UV_JOIN_GROUP)
})
})
}
fn leave_multicast(&mut self, multi: ip::IpAddr) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
multi.to_str().with_c_str(|m_addr| {
uvll::uv_udp_set_membership(self.handle,
m_addr, ptr::null(),
uvll::UV_LEAVE_GROUP)
})
})
}
fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_udp_set_multicast_loop(self.handle,
1 as c_int)
})
}
fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_udp_set_multicast_loop(self.handle,
0 as c_int)
})
}
fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_udp_set_multicast_ttl(self.handle,
ttl as c_int)
})
}
fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_udp_set_ttl(self.handle, ttl as c_int)
})
}
fn hear_broadcasts(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_udp_set_broadcast(self.handle,
1 as c_int)
})
}
fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_udp_set_broadcast(self.handle,
0 as c_int)
})
}
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
fn clone(&self) -> ~rtio::RtioUdpSocket:Send {
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
~UdpWatcher {
handle: self.handle,
home: self.home.clone(),
refcount: self.refcount.clone(),
write_access: self.write_access.clone(),
read_access: self.read_access.clone(),
} as ~rtio::RtioUdpSocket:Send
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
}
}
impl Drop for UdpWatcher {
fn drop(&mut self) {
// Send ourselves home to close this handle (blocking while doing so).
let _m = self.fire_homing_missile();
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
if self.refcount.decrement() {
self.close();
}
}
}
#[cfg(test)]
mod test {
2013-11-06 13:03:11 -06:00
use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
RtioUdpSocket};
use std::io::test::{next_test_ip4, next_test_ip6};
2013-11-06 13:03:11 -06:00
use super::{UdpWatcher, TcpWatcher, TcpListener};
use super::super::local_loop;
#[test]
fn connect_close_ip4() {
match TcpWatcher::connect(local_loop(), next_test_ip4(), None) {
2013-11-28 14:22:53 -06:00
Ok(..) => fail!(),
2014-04-15 20:17:48 -05:00
Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
}
}
#[test]
fn connect_close_ip6() {
match TcpWatcher::connect(local_loop(), next_test_ip6(), None) {
2013-11-28 14:22:53 -06:00
Ok(..) => fail!(),
2014-04-15 20:17:48 -05:00
Err(e) => assert_eq!(e.name(), "ECONNREFUSED".to_owned()),
}
}
#[test]
fn udp_bind_close_ip4() {
match UdpWatcher::bind(local_loop(), next_test_ip4()) {
2013-11-28 14:22:53 -06:00
Ok(..) => {}
Err(..) => fail!()
}
}
#[test]
fn udp_bind_close_ip6() {
match UdpWatcher::bind(local_loop(), next_test_ip6()) {
2013-11-28 14:22:53 -06:00
Ok(..) => {}
Err(..) => fail!()
}
}
#[test]
fn listen_ip4() {
let (tx, rx) = channel();
let addr = next_test_ip4();
2014-01-26 21:57:42 -06:00
spawn(proc() {
let w = match TcpListener::bind(local_loop(), addr) {
2013-11-06 13:03:11 -06:00
Ok(w) => w, Err(e) => fail!("{:?}", e)
};
let mut w = match w.listen() {
Ok(w) => w, Err(e) => fail!("{:?}", e),
};
tx.send(());
match w.accept() {
Ok(mut stream) => {
let mut buf = [0u8, ..10];
match stream.read(buf) {
Ok(10) => {} e => fail!("{:?}", e),
}
for i in range(0, 10u8) {
2014-04-01 22:39:26 -05:00
assert_eq!(buf[i as uint], i + 1);
}
}
Err(e) => fail!("{:?}", e)
2013-11-06 13:03:11 -06:00
}
2014-01-26 21:57:42 -06:00
});
rx.recv();
let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
Ok(w) => w, Err(e) => fail!("{:?}", e)
};
match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
Ok(()) => {}, Err(e) => fail!("{:?}", e)
}
}
#[test]
fn listen_ip6() {
let (tx, rx) = channel();
let addr = next_test_ip6();
2013-11-06 13:03:11 -06:00
2014-01-26 21:57:42 -06:00
spawn(proc() {
let w = match TcpListener::bind(local_loop(), addr) {
2013-11-06 13:03:11 -06:00
Ok(w) => w, Err(e) => fail!("{:?}", e)
};
let mut w = match w.listen() {
Ok(w) => w, Err(e) => fail!("{:?}", e),
};
tx.send(());
match w.accept() {
Ok(mut stream) => {
let mut buf = [0u8, ..10];
match stream.read(buf) {
Ok(10) => {} e => fail!("{:?}", e),
}
for i in range(0, 10u8) {
2014-04-01 22:39:26 -05:00
assert_eq!(buf[i as uint], i + 1);
}
}
Err(e) => fail!("{:?}", e)
2013-11-06 13:03:11 -06:00
}
2014-01-26 21:57:42 -06:00
});
rx.recv();
let mut w = match TcpWatcher::connect(local_loop(), addr, None) {
Ok(w) => w, Err(e) => fail!("{:?}", e)
};
match w.write([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) {
Ok(()) => {}, Err(e) => fail!("{:?}", e)
}
2013-11-06 13:03:11 -06:00
}
#[test]
fn udp_recv_ip4() {
let (tx, rx) = channel();
let client = next_test_ip4();
let server = next_test_ip4();
2014-01-26 21:57:42 -06:00
spawn(proc() {
match UdpWatcher::bind(local_loop(), server) {
Ok(mut w) => {
tx.send(());
let mut buf = [0u8, ..10];
match w.recvfrom(buf) {
Ok((10, addr)) => assert_eq!(addr, client),
e => fail!("{:?}", e),
}
for i in range(0, 10u8) {
2014-04-01 22:39:26 -05:00
assert_eq!(buf[i as uint], i + 1);
}
}
Err(e) => fail!("{:?}", e)
2013-11-06 13:03:11 -06:00
}
2014-01-26 21:57:42 -06:00
});
2013-11-06 13:03:11 -06:00
rx.recv();
let mut w = match UdpWatcher::bind(local_loop(), client) {
Ok(w) => w, Err(e) => fail!("{:?}", e)
};
match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
Ok(()) => {}, Err(e) => fail!("{:?}", e)
2013-11-06 13:03:11 -06:00
}
}
#[test]
fn udp_recv_ip6() {
let (tx, rx) = channel();
let client = next_test_ip6();
let server = next_test_ip6();
2014-01-26 21:57:42 -06:00
spawn(proc() {
match UdpWatcher::bind(local_loop(), server) {
Ok(mut w) => {
tx.send(());
let mut buf = [0u8, ..10];
match w.recvfrom(buf) {
Ok((10, addr)) => assert_eq!(addr, client),
e => fail!("{:?}", e),
}
for i in range(0, 10u8) {
2014-04-01 22:39:26 -05:00
assert_eq!(buf[i as uint], i + 1);
2013-11-06 13:03:11 -06:00
}
}
Err(e) => fail!("{:?}", e)
2013-11-06 13:03:11 -06:00
}
2014-01-26 21:57:42 -06:00
});
rx.recv();
let mut w = match UdpWatcher::bind(local_loop(), client) {
Ok(w) => w, Err(e) => fail!("{:?}", e)
};
match w.sendto([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], server) {
Ok(()) => {}, Err(e) => fail!("{:?}", e)
}
}
#[test]
2013-11-06 13:03:11 -06:00
fn test_read_read_read() {
let addr = next_test_ip4();
static MAX: uint = 5000;
let (tx, rx) = channel();
2014-01-26 21:57:42 -06:00
spawn(proc() {
let listener = TcpListener::bind(local_loop(), addr).unwrap();
let mut acceptor = listener.listen().unwrap();
tx.send(());
let mut stream = acceptor.accept().unwrap();
let buf = [1, .. 2048];
let mut total_bytes_written = 0;
while total_bytes_written < MAX {
assert!(stream.write(buf).is_ok());
uvdebug!("wrote bytes");
total_bytes_written += buf.len();
2013-11-06 13:03:11 -06:00
}
2014-01-26 21:57:42 -06:00
});
2013-11-06 13:03:11 -06:00
rx.recv();
let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
let mut buf = [0, .. 2048];
let mut total_bytes_read = 0;
while total_bytes_read < MAX {
let nread = stream.read(buf).unwrap();
total_bytes_read += nread;
for i in range(0u, nread) {
assert_eq!(buf[i], 1);
2013-11-06 13:03:11 -06:00
}
}
uvdebug!("read {} bytes total", total_bytes_read);
2013-11-06 13:03:11 -06:00
}
#[test]
2013-11-09 13:02:16 -06:00
#[ignore(cfg(windows))] // FIXME(#10102) server never sees second packet
2013-11-06 13:03:11 -06:00
fn test_udp_twice() {
let server_addr = next_test_ip4();
let client_addr = next_test_ip4();
let (tx, rx) = channel();
2014-01-26 21:57:42 -06:00
spawn(proc() {
let mut client = UdpWatcher::bind(local_loop(), client_addr).unwrap();
rx.recv();
assert!(client.sendto([1], server_addr).is_ok());
assert!(client.sendto([2], server_addr).is_ok());
2014-01-26 21:57:42 -06:00
});
let mut server = UdpWatcher::bind(local_loop(), server_addr).unwrap();
tx.send(());
let mut buf1 = [0];
let mut buf2 = [0];
let (nread1, src1) = server.recvfrom(buf1).unwrap();
let (nread2, src2) = server.recvfrom(buf2).unwrap();
assert_eq!(nread1, 1);
assert_eq!(nread2, 1);
assert_eq!(src1, client_addr);
assert_eq!(src2, client_addr);
assert_eq!(buf1[0], 1);
assert_eq!(buf2[0], 2);
2013-11-06 13:03:11 -06:00
}
2013-11-06 13:03:11 -06:00
#[test]
fn test_udp_many_read() {
let server_out_addr = next_test_ip4();
let server_in_addr = next_test_ip4();
let client_out_addr = next_test_ip4();
let client_in_addr = next_test_ip4();
static MAX: uint = 500_000;
let (tx1, rx1) = channel::<()>();
let (tx2, rx2) = channel::<()>();
2014-01-26 21:57:42 -06:00
spawn(proc() {
let l = local_loop();
let mut server_out = UdpWatcher::bind(l, server_out_addr).unwrap();
let mut server_in = UdpWatcher::bind(l, server_in_addr).unwrap();
let (tx, rx) = (tx2, rx1);
tx.send(());
rx.recv();
let msg = [1, .. 2048];
let mut total_bytes_sent = 0;
let mut buf = [1];
while buf[0] == 1 {
// send more data
assert!(server_out.sendto(msg, client_in_addr).is_ok());
total_bytes_sent += msg.len();
// check if the client has received enough
let res = server_in.recvfrom(buf);
assert!(res.is_ok());
let (nread, src) = res.unwrap();
assert_eq!(nread, 1);
assert_eq!(src, client_out_addr);
2013-11-06 13:03:11 -06:00
}
assert!(total_bytes_sent >= MAX);
2014-01-26 21:57:42 -06:00
});
let l = local_loop();
let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
let (tx, rx) = (tx1, rx2);
rx.recv();
tx.send(());
let mut total_bytes_recv = 0;
let mut buf = [0, .. 2048];
while total_bytes_recv < MAX {
// ask for more
assert!(client_out.sendto([1], server_in_addr).is_ok());
// wait for data
let res = client_in.recvfrom(buf);
assert!(res.is_ok());
let (nread, src) = res.unwrap();
assert_eq!(src, server_out_addr);
total_bytes_recv += nread;
for i in range(0u, nread) {
assert_eq!(buf[i], 1);
2013-11-06 13:03:11 -06:00
}
}
// tell the server we're done
assert!(client_out.sendto([0], server_in_addr).is_ok());
2013-11-06 13:03:11 -06:00
}
#[test]
fn test_read_and_block() {
let addr = next_test_ip4();
let (tx, rx) = channel::<Receiver<()>>();
2014-01-26 21:57:42 -06:00
spawn(proc() {
let rx = rx.recv();
let mut stream = TcpWatcher::connect(local_loop(), addr, None).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
rx.recv();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
rx.recv();
2014-01-26 21:57:42 -06:00
});
let listener = TcpListener::bind(local_loop(), addr).unwrap();
let mut acceptor = listener.listen().unwrap();
let (tx2, rx2) = channel();
tx.send(rx2);
let mut stream = acceptor.accept().unwrap();
let mut buf = [0, .. 2048];
let expected = 32;
let mut current = 0;
let mut reads = 0;
while current < expected {
let nread = stream.read(buf).unwrap();
for i in range(0u, nread) {
let val = buf[i] as uint;
assert_eq!(val, current % 8);
current += 1;
}
reads += 1;
std: Make std::comm return types consistent There are currently a number of return values from the std::comm methods, not all of which are necessarily completely expressive: Sender::try_send(t: T) -> bool This method currently doesn't transmit back the data `t` if the send fails due to the other end having disconnected. Additionally, this shares the name of the synchronous try_send method, but it differs in semantics in that it only has one failure case, not two (the buffer can never be full). SyncSender::try_send(t: T) -> TrySendResult<T> This method accurately conveys all possible information, but it uses a custom type to the std::comm module with no convenience methods on it. Additionally, if you want to inspect the result you're forced to import something from `std::comm`. SyncSender::send_opt(t: T) -> Option<T> This method uses Some(T) as an "error value" and None as a "success value", but almost all other uses of Option<T> have Some/None the other way Receiver::try_recv(t: T) -> TryRecvResult<T> Similarly to the synchronous try_send, this custom return type is lacking in terms of usability (no convenience methods). With this number of drawbacks in mind, I believed it was time to re-work the return types of these methods. The new API for the comm module is: Sender::send(t: T) -> () Sender::send_opt(t: T) -> Result<(), T> SyncSender::send(t: T) -> () SyncSender::send_opt(t: T) -> Result<(), T> SyncSender::try_send(t: T) -> Result<(), TrySendError<T>> Receiver::recv() -> T Receiver::recv_opt() -> Result<T, ()> Receiver::try_recv() -> Result<T, TryRecvError> The notable changes made are: * Sender::try_send => Sender::send_opt. This renaming brings the semantics in line with the SyncSender::send_opt method. An asychronous send only has one failure case, unlike the synchronous try_send method which has two failure cases (full/disconnected). * Sender::send_opt returns the data back to the caller if the send is guaranteed to fail. This method previously returned `bool`, but then it was unable to retrieve the data if the data was guaranteed to fail to send. There is still a race such that when `Ok(())` is returned the data could still fail to be received, but that's inherent to an asynchronous channel. * Result is now the basis of all return values. This not only adds lots of convenience methods to all return values for free, but it also means that you can inspect the return values with no extra imports (Ok/Err are in the prelude). Additionally, it's now self documenting when something failed or not because the return value has "Err" in the name. Things I'm a little uneasy about: * The methods send_opt and recv_opt are not returning options, but rather results. I felt more strongly that Option was the wrong return type than the _opt prefix was wrong, and I coudn't think of a much better name for these methods. One possible way to think about them is to read the _opt suffix as "optionally". * Result<T, ()> is often better expressed as Option<T>. This is only applicable to the recv_opt() method, but I thought it would be more consistent for everything to return Result rather than one method returning an Option. Despite my two reasons to feel uneasy, I feel much better about the consistency in return values at this point, and I think the only real open question is if there's a better suffix for {send,recv}_opt. Closes #11527
2014-04-10 12:53:49 -05:00
let _ = tx2.send_opt(());
}
// Make sure we had multiple reads
assert!(reads > 1);
2013-11-06 13:03:11 -06:00
}
2013-11-06 13:03:11 -06:00
#[test]
fn test_simple_tcp_server_and_client_on_diff_threads() {
let addr = next_test_ip4();
2014-01-26 21:57:42 -06:00
spawn(proc() {
let listener = TcpListener::bind(local_loop(), addr).unwrap();
let mut acceptor = listener.listen().unwrap();
let mut stream = acceptor.accept().unwrap();
let mut buf = [0, .. 2048];
let nread = stream.read(buf).unwrap();
assert_eq!(nread, 8);
for i in range(0u, nread) {
assert_eq!(buf[i], i as u8);
}
2014-01-26 21:57:42 -06:00
});
let mut stream = TcpWatcher::connect(local_loop(), addr, None);
while stream.is_err() {
stream = TcpWatcher::connect(local_loop(), addr, None);
}
stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]).unwrap();
}
2013-11-06 13:03:11 -06:00
#[should_fail] #[test]
fn tcp_listener_fail_cleanup() {
let addr = next_test_ip4();
let w = TcpListener::bind(local_loop(), addr).unwrap();
let _w = w.listen().unwrap();
fail!();
}
#[should_fail] #[test]
fn tcp_stream_fail_cleanup() {
let (tx, rx) = channel();
let addr = next_test_ip4();
2014-01-26 21:57:42 -06:00
spawn(proc() {
let w = TcpListener::bind(local_loop(), addr).unwrap();
let mut w = w.listen().unwrap();
tx.send(());
drop(w.accept().unwrap());
2014-01-26 21:57:42 -06:00
});
rx.recv();
let _w = TcpWatcher::connect(local_loop(), addr, None).unwrap();
fail!();
}
#[should_fail] #[test]
fn udp_listener_fail_cleanup() {
let addr = next_test_ip4();
let _w = UdpWatcher::bind(local_loop(), addr).unwrap();
fail!();
}
#[should_fail] #[test]
fn udp_fail_other_task() {
let addr = next_test_ip4();
let (tx, rx) = channel();
// force the handle to be created on a different scheduler, failure in
// the original task will force a homing operation back to this
// scheduler.
2014-01-26 21:57:42 -06:00
spawn(proc() {
let w = UdpWatcher::bind(local_loop(), addr).unwrap();
tx.send(w);
2014-01-26 21:57:42 -06:00
});
let _w = rx.recv();
fail!();
}
}