rust/src/libnative/io/net.rs

1104 lines
37 KiB
Rust
Raw Normal View History

// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
2013-12-27 19:50:16 -06:00
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use alloc::arc::Arc;
use libc;
2013-12-27 19:50:16 -06:00
use std::mem;
use std::ptr;
2014-06-04 02:00:49 -05:00
use std::rt::mutex;
use std::rt::rtio::{mod, IoResult, IoError};
use std::sync::atomic;
2013-12-27 19:50:16 -06:00
2014-06-04 02:00:49 -05:00
use super::{retry, keep_going};
use super::c;
use super::util;
#[cfg(unix)] use super::process;
#[cfg(unix)] use super::file::FileDesc;
pub use self::os::{init, sock_t, last_error};
2013-12-27 19:50:16 -06:00
2013-12-28 18:40:15 -06:00
////////////////////////////////////////////////////////////////////////////////
// sockaddr and misc bindings
////////////////////////////////////////////////////////////////////////////////
2013-12-27 19:50:16 -06:00
pub fn htons(u: u16) -> u16 {
u.to_be()
2013-12-27 19:50:16 -06:00
}
pub fn ntohs(u: u16) -> u16 {
Int::from_be(u)
2013-12-27 19:50:16 -06:00
}
2013-12-28 18:40:15 -06:00
enum InAddr {
In4Addr(libc::in_addr),
2013-12-28 18:40:15 -06:00
In6Addr(libc::in6_addr),
}
2014-06-04 02:00:49 -05:00
fn ip_to_inaddr(ip: rtio::IpAddr) -> InAddr {
2013-12-28 18:40:15 -06:00
match ip {
2014-06-04 02:00:49 -05:00
rtio::Ipv4Addr(a, b, c, d) => {
2014-05-27 04:00:50 -05:00
let ip = (a as u32 << 24) |
(b as u32 << 16) |
(c as u32 << 8) |
(d as u32 << 0);
In4Addr(libc::in_addr {
s_addr: Int::from_be(ip)
2013-12-28 18:40:15 -06:00
})
}
2014-06-04 02:00:49 -05:00
rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => {
2013-12-28 18:40:15 -06:00
In6Addr(libc::in6_addr {
s6_addr: [
htons(a),
htons(b),
htons(c),
htons(d),
htons(e),
htons(f),
htons(g),
htons(h),
]
})
}
}
}
fn addr_to_sockaddr(addr: rtio::SocketAddr,
storage: &mut libc::sockaddr_storage)
-> libc::socklen_t {
2013-12-27 19:50:16 -06:00
unsafe {
2013-12-28 18:40:15 -06:00
let len = match ip_to_inaddr(addr.ip) {
In4Addr(inaddr) => {
let storage = storage as *mut _ as *mut libc::sockaddr_in;
2013-12-27 19:50:16 -06:00
(*storage).sin_family = libc::AF_INET as libc::sa_family_t;
(*storage).sin_port = htons(addr.port);
2013-12-28 18:40:15 -06:00
(*storage).sin_addr = inaddr;
2013-12-27 19:50:16 -06:00
mem::size_of::<libc::sockaddr_in>()
}
2013-12-28 18:40:15 -06:00
In6Addr(inaddr) => {
let storage = storage as *mut _ as *mut libc::sockaddr_in6;
2013-12-27 19:50:16 -06:00
(*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t;
(*storage).sin6_port = htons(addr.port);
2013-12-28 18:40:15 -06:00
(*storage).sin6_addr = inaddr;
2013-12-27 19:50:16 -06:00
mem::size_of::<libc::sockaddr_in6>()
}
};
return len as libc::socklen_t;
2013-12-27 19:50:16 -06:00
}
}
2014-06-04 02:00:49 -05:00
fn socket(addr: rtio::SocketAddr, ty: libc::c_int) -> IoResult<sock_t> {
2013-12-27 19:50:16 -06:00
unsafe {
let fam = match addr.ip {
2014-06-04 02:00:49 -05:00
rtio::Ipv4Addr(..) => libc::AF_INET,
rtio::Ipv6Addr(..) => libc::AF_INET6,
2013-12-27 19:50:16 -06:00
};
2013-12-28 18:40:15 -06:00
match libc::socket(fam, ty, 0) {
-1 => Err(os::last_error()),
2013-12-27 19:50:16 -06:00
fd => Ok(fd),
}
}
}
2013-12-28 18:40:15 -06:00
fn setsockopt<T>(fd: sock_t, opt: libc::c_int, val: libc::c_int,
payload: T) -> IoResult<()> {
unsafe {
2014-06-25 14:47:34 -05:00
let payload = &payload as *const T as *const libc::c_void;
2013-12-28 18:40:15 -06:00
let ret = libc::setsockopt(fd, opt, val,
payload,
mem::size_of::<T>() as libc::socklen_t);
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
if ret != 0 {
Err(os::last_error())
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
} else {
Ok(())
}
2013-12-28 18:40:15 -06:00
}
}
pub fn getsockopt<T: Copy>(fd: sock_t, opt: libc::c_int,
val: libc::c_int) -> IoResult<T> {
unsafe {
let mut slot: T = mem::zeroed();
let mut len = mem::size_of::<T>() as libc::socklen_t;
let ret = c::getsockopt(fd, opt, val,
&mut slot as *mut _ as *mut _,
&mut len);
if ret != 0 {
Err(os::last_error())
} else {
assert!(len as uint == mem::size_of::<T>());
Ok(slot)
}
}
}
2013-12-27 19:50:16 -06:00
fn sockname(fd: sock_t,
2014-05-09 18:30:57 -05:00
f: unsafe extern "system" fn(sock_t, *mut libc::sockaddr,
2013-12-27 19:50:16 -06:00
*mut libc::socklen_t) -> libc::c_int)
2014-06-04 02:00:49 -05:00
-> IoResult<rtio::SocketAddr>
2013-12-27 19:50:16 -06:00
{
let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
2013-12-27 19:50:16 -06:00
let mut len = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
unsafe {
let storage = &mut storage as *mut libc::sockaddr_storage;
let ret = f(fd,
storage as *mut libc::sockaddr,
&mut len as *mut libc::socklen_t);
if ret != 0 {
return Err(os::last_error())
2013-12-27 19:50:16 -06:00
}
}
2013-12-28 18:40:15 -06:00
return sockaddr_to_addr(&storage, len as uint);
}
pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage,
2014-06-04 02:00:49 -05:00
len: uint) -> IoResult<rtio::SocketAddr> {
2013-12-27 19:50:16 -06:00
match storage.ss_family as libc::c_int {
libc::AF_INET => {
assert!(len as uint >= mem::size_of::<libc::sockaddr_in>());
2013-12-28 18:40:15 -06:00
let storage: &libc::sockaddr_in = unsafe {
core: Remove the cast module This commit revisits the `cast` module in libcore and libstd, and scrutinizes all functions inside of it. The result was to remove the `cast` module entirely, folding all functionality into the `mem` module. Specifically, this is the fate of each function in the `cast` module. * transmute - This function was moved to `mem`, but it is now marked as #[unstable]. This is due to planned changes to the `transmute` function and how it can be invoked (see the #[unstable] comment). For more information, see RFC 5 and #12898 * transmute_copy - This function was moved to `mem`, with clarification that is is not an error to invoke it with T/U that are different sizes, but rather that it is strongly discouraged. This function is now #[stable] * forget - This function was moved to `mem` and marked #[stable] * bump_box_refcount - This function was removed due to the deprecation of managed boxes as well as its questionable utility. * transmute_mut - This function was previously deprecated, and removed as part of this commit. * transmute_mut_unsafe - This function doesn't serve much of a purpose when it can be achieved with an `as` in safe code, so it was removed. * transmute_lifetime - This function was removed because it is likely a strong indication that code is incorrect in the first place. * transmute_mut_lifetime - This function was removed for the same reasons as `transmute_lifetime` * copy_lifetime - This function was moved to `mem`, but it is marked `#[unstable]` now due to the likelihood of being removed in the future if it is found to not be very useful. * copy_mut_lifetime - This function was also moved to `mem`, but had the same treatment as `copy_lifetime`. * copy_lifetime_vec - This function was removed because it is not used today, and its existence is not necessary with DST (copy_lifetime will suffice). In summary, the cast module was stripped down to these functions, and then the functions were moved to the `mem` module. transmute - #[unstable] transmute_copy - #[stable] forget - #[stable] copy_lifetime - #[unstable] copy_mut_lifetime - #[unstable] [breaking-change]
2014-05-09 12:34:51 -05:00
mem::transmute(storage)
2013-12-27 19:50:16 -06:00
};
let ip = (storage.sin_addr.s_addr as u32).to_be();
2014-05-27 04:00:50 -05:00
let a = (ip >> 24) as u8;
let b = (ip >> 16) as u8;
let c = (ip >> 8) as u8;
let d = (ip >> 0) as u8;
2014-06-04 02:00:49 -05:00
Ok(rtio::SocketAddr {
ip: rtio::Ipv4Addr(a, b, c, d),
2013-12-27 19:50:16 -06:00
port: ntohs(storage.sin_port),
})
}
libc::AF_INET6 => {
assert!(len as uint >= mem::size_of::<libc::sockaddr_in6>());
2013-12-28 18:40:15 -06:00
let storage: &libc::sockaddr_in6 = unsafe {
core: Remove the cast module This commit revisits the `cast` module in libcore and libstd, and scrutinizes all functions inside of it. The result was to remove the `cast` module entirely, folding all functionality into the `mem` module. Specifically, this is the fate of each function in the `cast` module. * transmute - This function was moved to `mem`, but it is now marked as #[unstable]. This is due to planned changes to the `transmute` function and how it can be invoked (see the #[unstable] comment). For more information, see RFC 5 and #12898 * transmute_copy - This function was moved to `mem`, with clarification that is is not an error to invoke it with T/U that are different sizes, but rather that it is strongly discouraged. This function is now #[stable] * forget - This function was moved to `mem` and marked #[stable] * bump_box_refcount - This function was removed due to the deprecation of managed boxes as well as its questionable utility. * transmute_mut - This function was previously deprecated, and removed as part of this commit. * transmute_mut_unsafe - This function doesn't serve much of a purpose when it can be achieved with an `as` in safe code, so it was removed. * transmute_lifetime - This function was removed because it is likely a strong indication that code is incorrect in the first place. * transmute_mut_lifetime - This function was removed for the same reasons as `transmute_lifetime` * copy_lifetime - This function was moved to `mem`, but it is marked `#[unstable]` now due to the likelihood of being removed in the future if it is found to not be very useful. * copy_mut_lifetime - This function was also moved to `mem`, but had the same treatment as `copy_lifetime`. * copy_lifetime_vec - This function was removed because it is not used today, and its existence is not necessary with DST (copy_lifetime will suffice). In summary, the cast module was stripped down to these functions, and then the functions were moved to the `mem` module. transmute - #[unstable] transmute_copy - #[stable] forget - #[stable] copy_lifetime - #[unstable] copy_mut_lifetime - #[unstable] [breaking-change]
2014-05-09 12:34:51 -05:00
mem::transmute(storage)
2013-12-27 19:50:16 -06:00
};
let a = ntohs(storage.sin6_addr.s6_addr[0]);
let b = ntohs(storage.sin6_addr.s6_addr[1]);
let c = ntohs(storage.sin6_addr.s6_addr[2]);
let d = ntohs(storage.sin6_addr.s6_addr[3]);
let e = ntohs(storage.sin6_addr.s6_addr[4]);
let f = ntohs(storage.sin6_addr.s6_addr[5]);
let g = ntohs(storage.sin6_addr.s6_addr[6]);
let h = ntohs(storage.sin6_addr.s6_addr[7]);
2014-06-04 02:00:49 -05:00
Ok(rtio::SocketAddr {
ip: rtio::Ipv6Addr(a, b, c, d, e, f, g, h),
2013-12-27 19:50:16 -06:00
port: ntohs(storage.sin6_port),
})
}
_ => {
#[cfg(unix)] use libc::EINVAL as ERROR;
#[cfg(windows)] use libc::WSAEINVAL as ERROR;
2014-06-04 02:00:49 -05:00
Err(IoError {
code: ERROR as uint,
extra: 0,
detail: None,
})
2013-12-27 19:50:16 -06:00
}
}
}
2013-12-28 18:40:15 -06:00
////////////////////////////////////////////////////////////////////////////////
// TCP streams
////////////////////////////////////////////////////////////////////////////////
pub struct TcpStream {
inner: Arc<Inner>,
read_deadline: u64,
write_deadline: u64,
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
}
struct Inner {
fd: sock_t,
// Unused on Linux, where this lock is not necessary.
#[allow(dead_code)]
lock: mutex::NativeMutex
}
pub struct Guard<'a> {
pub fd: sock_t,
pub guard: mutex::LockGuard<'a>,
}
impl Inner {
fn new(fd: sock_t) -> Inner {
Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
}
2013-12-28 18:40:15 -06:00
}
2013-12-27 19:50:16 -06:00
impl TcpStream {
2014-06-04 02:00:49 -05:00
pub fn connect(addr: rtio::SocketAddr,
timeout: Option<u64>) -> IoResult<TcpStream> {
let fd = try!(socket(addr, libc::SOCK_STREAM));
let ret = TcpStream::new(Inner::new(fd));
let mut storage = unsafe { mem::zeroed() };
let len = addr_to_sockaddr(addr, &mut storage);
let addrp = &storage as *const _ as *const libc::sockaddr;
match timeout {
Some(timeout) => {
try!(util::connect_timeout(fd, addrp, len, timeout));
Ok(ret)
},
None => {
match retry(|| unsafe { libc::connect(fd, addrp, len) }) {
-1 => Err(os::last_error()),
2013-12-27 19:50:16 -06:00
_ => Ok(ret),
}
}
}
}
fn new(inner: Inner) -> TcpStream {
TcpStream {
inner: Arc::new(inner),
read_deadline: 0,
write_deadline: 0,
}
}
pub fn fd(&self) -> sock_t { self.inner.fd }
2013-12-27 19:50:16 -06:00
fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY,
2013-12-28 18:40:15 -06:00
nodelay as libc::c_int)
2013-12-27 19:50:16 -06:00
}
fn set_keepalive(&mut self, seconds: Option<uint>) -> IoResult<()> {
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE,
2013-12-28 18:40:15 -06:00
seconds.is_some() as libc::c_int);
match seconds {
Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)),
None => ret,
2013-12-27 19:50:16 -06:00
}
}
#[cfg(target_os = "macos")]
2014-05-05 02:07:49 -05:00
#[cfg(target_os = "ios")]
2013-12-28 18:40:15 -06:00
fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE,
2013-12-28 18:40:15 -06:00
seconds as libc::c_int)
2013-12-27 19:50:16 -06:00
}
#[cfg(target_os = "freebsd")]
#[cfg(target_os = "dragonfly")]
2013-12-28 18:40:15 -06:00
fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> {
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE,
2013-12-28 18:40:15 -06:00
seconds as libc::c_int)
2013-12-27 19:50:16 -06:00
}
#[cfg(not(target_os = "macos"), not(target_os = "ios"), not(target_os = "freebsd"),
not(target_os = "dragonfly"))]
2013-12-28 18:40:15 -06:00
fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> {
2013-12-27 19:50:16 -06:00
Ok(())
}
#[cfg(target_os = "linux")]
fn lock_nonblocking(&self) {}
#[cfg(not(target_os = "linux"))]
fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
let ret = Guard {
fd: self.fd(),
guard: unsafe { self.inner.lock.lock() },
};
assert!(util::set_nonblocking(self.fd(), true).is_ok());
ret
}
2013-12-27 19:50:16 -06:00
}
#[cfg(windows)] type wrlen = libc::c_int;
#[cfg(not(windows))] type wrlen = libc::size_t;
impl rtio::RtioTcpStream for TcpStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> {
let fd = self.fd();
let dolock = || self.lock_nonblocking();
let doread = |nb| unsafe {
let flags = if nb {c::MSG_DONTWAIT} else {0};
libc::recv(fd,
buf.as_mut_ptr() as *mut libc::c_void,
buf.len() as wrlen,
flags) as libc::c_int
};
read(fd, self.read_deadline, dolock, doread)
2013-12-27 19:50:16 -06:00
}
2013-12-27 19:50:16 -06:00
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
let fd = self.fd();
let dolock = || self.lock_nonblocking();
2014-06-25 14:47:34 -05:00
let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe {
let flags = if nb {c::MSG_DONTWAIT} else {0};
libc::send(fd,
buf as *const _,
len as wrlen,
flags) as i64
};
match write(fd, self.write_deadline, buf, true, dolock, dowrite) {
Ok(_) => Ok(()),
Err(e) => Err(e)
2013-12-27 19:50:16 -06:00
}
}
2014-06-04 02:00:49 -05:00
fn peer_name(&mut self) -> IoResult<rtio::SocketAddr> {
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
sockname(self.fd(), libc::getpeername)
2013-12-27 19:50:16 -06:00
}
fn control_congestion(&mut self) -> IoResult<()> {
self.set_nodelay(false)
}
fn nodelay(&mut self) -> IoResult<()> {
self.set_nodelay(true)
}
fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()> {
self.set_keepalive(Some(delay_in_seconds))
}
fn letdie(&mut self) -> IoResult<()> {
self.set_keepalive(None)
}
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
2014-06-14 13:03:34 -05:00
fn clone(&self) -> Box<rtio::RtioTcpStream + Send> {
box TcpStream {
inner: self.inner.clone(),
read_deadline: 0,
write_deadline: 0,
2014-06-14 13:03:34 -05:00
} as Box<rtio::RtioTcpStream + Send>
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
}
fn close_write(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
}
fn close_read(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
}
fn set_timeout(&mut self, timeout: Option<u64>) {
let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
self.read_deadline = deadline;
self.write_deadline = deadline;
}
fn set_read_timeout(&mut self, timeout: Option<u64>) {
self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
fn set_write_timeout(&mut self, timeout: Option<u64>) {
self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
2013-12-27 19:50:16 -06:00
}
impl rtio::RtioSocket for TcpStream {
2014-06-04 02:00:49 -05:00
fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
sockname(self.fd(), libc::getsockname)
2013-12-27 19:50:16 -06:00
}
}
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
impl Drop for Inner {
fn drop(&mut self) { unsafe { os::close(self.fd); } }
2013-12-27 19:50:16 -06:00
}
#[unsafe_destructor]
impl<'a> Drop for Guard<'a> {
fn drop(&mut self) {
assert!(util::set_nonblocking(self.fd, false).is_ok());
}
}
2013-12-28 18:40:15 -06:00
////////////////////////////////////////////////////////////////////////////////
// TCP listeners
////////////////////////////////////////////////////////////////////////////////
2013-12-27 19:50:16 -06:00
pub struct TcpListener {
inner: Inner,
2013-12-27 19:50:16 -06:00
}
impl TcpListener {
2014-06-04 02:00:49 -05:00
pub fn bind(addr: rtio::SocketAddr) -> IoResult<TcpListener> {
let fd = try!(socket(addr, libc::SOCK_STREAM));
let ret = TcpListener { inner: Inner::new(fd) };
let mut storage = unsafe { mem::zeroed() };
let len = addr_to_sockaddr(addr, &mut storage);
let addrp = &storage as *const _ as *const libc::sockaddr;
// On platforms with Berkeley-derived sockets, this allows
// to quickly rebind a socket, without needing to wait for
// the OS to clean up the previous one.
if cfg!(unix) {
try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR,
1 as libc::c_int));
}
match unsafe { libc::bind(fd, addrp, len) } {
-1 => Err(os::last_error()),
_ => Ok(ret),
2013-12-27 19:50:16 -06:00
}
}
pub fn fd(&self) -> sock_t { self.inner.fd }
2013-12-27 19:50:16 -06:00
pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
-1 => Err(os::last_error()),
#[cfg(unix)]
_ => {
let (reader, writer) = try!(process::pipe());
try!(util::set_nonblocking(reader.fd(), true));
try!(util::set_nonblocking(writer.fd(), true));
try!(util::set_nonblocking(self.fd(), true));
Ok(TcpAcceptor {
inner: Arc::new(AcceptorInner {
listener: self,
reader: reader,
writer: writer,
closed: atomic::AtomicBool::new(false),
}),
deadline: 0,
})
}
#[cfg(windows)]
_ => {
let accept = try!(os::Event::new());
let ret = unsafe {
c::WSAEventSelect(self.fd(), accept.handle(), c::FD_ACCEPT)
};
if ret != 0 {
return Err(os::last_error())
}
Ok(TcpAcceptor {
inner: Arc::new(AcceptorInner {
listener: self,
abort: try!(os::Event::new()),
accept: accept,
closed: atomic::AtomicBool::new(false),
}),
deadline: 0,
})
}
2013-12-27 19:50:16 -06:00
}
}
}
impl rtio::RtioTcpListener for TcpListener {
fn listen(self: Box<TcpListener>)
-> IoResult<Box<rtio::RtioTcpAcceptor + Send>> {
self.native_listen(128).map(|a| {
2014-06-14 13:03:34 -05:00
box a as Box<rtio::RtioTcpAcceptor + Send>
})
2013-12-27 19:50:16 -06:00
}
}
impl rtio::RtioSocket for TcpListener {
2014-06-04 02:00:49 -05:00
fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
sockname(self.fd(), libc::getsockname)
2013-12-27 19:50:16 -06:00
}
}
pub struct TcpAcceptor {
inner: Arc<AcceptorInner>,
deadline: u64,
2013-12-27 19:50:16 -06:00
}
#[cfg(unix)]
struct AcceptorInner {
listener: TcpListener,
reader: FileDesc,
writer: FileDesc,
closed: atomic::AtomicBool,
}
#[cfg(windows)]
struct AcceptorInner {
listener: TcpListener,
abort: os::Event,
accept: os::Event,
closed: atomic::AtomicBool,
}
2013-12-27 19:50:16 -06:00
impl TcpAcceptor {
pub fn fd(&self) -> sock_t { self.inner.listener.fd() }
2013-12-27 19:50:16 -06:00
#[cfg(unix)]
2013-12-27 19:50:16 -06:00
pub fn native_accept(&mut self) -> IoResult<TcpStream> {
// In implementing accept, the two main concerns are dealing with
// close_accept() and timeouts. The unix implementation is based on a
// nonblocking accept plus a call to select(). Windows ends up having
// an entirely separate implementation than unix, which is explained
// below.
//
// To implement timeouts, all blocking is done via select() instead of
// accept() by putting the socket in non-blocking mode. Because
// select() takes a timeout argument, we just pass through the timeout
// to select().
//
// To implement close_accept(), we have a self-pipe to ourselves which
// is passed to select() along with the socket being accepted on. The
// self-pipe is never written to unless close_accept() is called.
let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
while !self.inner.closed.load(atomic::SeqCst) {
match retry(|| unsafe {
2014-09-14 22:27:36 -05:00
libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut())
}) {
-1 if util::wouldblock() => {}
-1 => return Err(os::last_error()),
fd => return Ok(TcpStream::new(Inner::new(fd as sock_t))),
2013-12-27 19:50:16 -06:00
}
try!(util::await([self.fd(), self.inner.reader.fd()],
deadline, util::Readable));
2013-12-27 19:50:16 -06:00
}
Err(util::eof())
2013-12-27 19:50:16 -06:00
}
#[cfg(windows)]
pub fn native_accept(&mut self) -> IoResult<TcpStream> {
// Unlink unix, windows cannot invoke `select` on arbitrary file
// descriptors like pipes, only sockets. Consequently, windows cannot
// use the same implementation as unix for accept() when close_accept()
// is considered.
//
// In order to implement close_accept() and timeouts, windows uses
// event handles. An acceptor-specific abort event is created which
// will only get set in close_accept(), and it will never be un-set.
// Additionally, another acceptor-specific event is associated with the
// FD_ACCEPT network event.
//
// These two events are then passed to WaitForMultipleEvents to see
// which one triggers first, and the timeout passed to this function is
// the local timeout for the acceptor.
//
// If the wait times out, then the accept timed out. If the wait
// succeeds with the abort event, then we were closed, and if the wait
// succeeds otherwise, then we do a nonblocking poll via `accept` to
// see if we can accept a connection. The connection is candidate to be
// stolen, so we do all of this in a loop as well.
let events = [self.inner.abort.handle(), self.inner.accept.handle()];
while !self.inner.closed.load(atomic::SeqCst) {
let ms = if self.deadline == 0 {
c::WSA_INFINITE as u64
} else {
let now = ::io::timer::now();
if self.deadline < now {0} else {self.deadline - now}
};
let ret = unsafe {
c::WSAWaitForMultipleEvents(2, events.as_ptr(), libc::FALSE,
ms as libc::DWORD, libc::FALSE)
};
match ret {
c::WSA_WAIT_TIMEOUT => {
return Err(util::timeout("accept timed out"))
}
c::WSA_WAIT_FAILED => return Err(os::last_error()),
c::WSA_WAIT_EVENT_0 => break,
n => assert_eq!(n, c::WSA_WAIT_EVENT_0 + 1),
}
let mut wsaevents: c::WSANETWORKEVENTS = unsafe { mem::zeroed() };
let ret = unsafe {
c::WSAEnumNetworkEvents(self.fd(), events[1], &mut wsaevents)
};
if ret != 0 { return Err(os::last_error()) }
if wsaevents.lNetworkEvents & c::FD_ACCEPT == 0 { continue }
match unsafe {
2014-09-14 22:27:36 -05:00
libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut())
} {
-1 if util::wouldblock() => {}
-1 => return Err(os::last_error()),
// Accepted sockets inherit the same properties as the caller,
// so we need to deregister our event and switch the socket back
// to blocking mode
fd => {
let stream = TcpStream::new(Inner::new(fd));
let ret = unsafe {
c::WSAEventSelect(fd, events[1], 0)
};
if ret != 0 { return Err(os::last_error()) }
try!(util::set_nonblocking(fd, false));
return Ok(stream)
}
}
}
Err(util::eof())
}
2013-12-27 19:50:16 -06:00
}
impl rtio::RtioSocket for TcpAcceptor {
2014-06-04 02:00:49 -05:00
fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
2013-12-28 18:40:15 -06:00
sockname(self.fd(), libc::getsockname)
2013-12-27 19:50:16 -06:00
}
}
impl rtio::RtioTcpAcceptor for TcpAcceptor {
2014-06-14 13:03:34 -05:00
fn accept(&mut self) -> IoResult<Box<rtio::RtioTcpStream + Send>> {
self.native_accept().map(|s| box s as Box<rtio::RtioTcpStream + Send>)
2013-12-27 19:50:16 -06:00
}
fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) }
fn set_timeout(&mut self, timeout: Option<u64>) {
self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
fn clone(&self) -> Box<rtio::RtioTcpAcceptor + Send> {
box TcpAcceptor {
inner: self.inner.clone(),
deadline: 0,
} as Box<rtio::RtioTcpAcceptor + Send>
}
#[cfg(unix)]
fn close_accept(&mut self) -> IoResult<()> {
self.inner.closed.store(true, atomic::SeqCst);
let mut fd = FileDesc::new(self.inner.writer.fd(), false);
match fd.inner_write([0]) {
Ok(..) => Ok(()),
Err(..) if util::wouldblock() => Ok(()),
Err(e) => Err(e),
}
}
#[cfg(windows)]
fn close_accept(&mut self) -> IoResult<()> {
self.inner.closed.store(true, atomic::SeqCst);
let ret = unsafe { c::WSASetEvent(self.inner.abort.handle()) };
if ret == libc::TRUE {
Ok(())
} else {
Err(os::last_error())
}
}
2013-12-27 19:50:16 -06:00
}
2013-12-28 18:40:15 -06:00
////////////////////////////////////////////////////////////////////////////////
// UDP
////////////////////////////////////////////////////////////////////////////////
pub struct UdpSocket {
inner: Arc<Inner>,
read_deadline: u64,
write_deadline: u64,
2013-12-28 18:40:15 -06:00
}
impl UdpSocket {
2014-06-04 02:00:49 -05:00
pub fn bind(addr: rtio::SocketAddr) -> IoResult<UdpSocket> {
let fd = try!(socket(addr, libc::SOCK_DGRAM));
let ret = UdpSocket {
inner: Arc::new(Inner::new(fd)),
read_deadline: 0,
write_deadline: 0,
};
let mut storage = unsafe { mem::zeroed() };
let len = addr_to_sockaddr(addr, &mut storage);
let addrp = &storage as *const _ as *const libc::sockaddr;
match unsafe { libc::bind(fd, addrp, len) } {
-1 => Err(os::last_error()),
_ => Ok(ret),
2013-12-28 18:40:15 -06:00
}
}
pub fn fd(&self) -> sock_t { self.inner.fd }
2013-12-28 18:40:15 -06:00
pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> {
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST,
2013-12-28 18:40:15 -06:00
on as libc::c_int)
}
pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> {
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP,
2013-12-28 18:40:15 -06:00
on as libc::c_int)
}
2014-06-04 02:00:49 -05:00
pub fn set_membership(&mut self, addr: rtio::IpAddr,
2013-12-28 18:40:15 -06:00
opt: libc::c_int) -> IoResult<()> {
match ip_to_inaddr(addr) {
In4Addr(addr) => {
2013-12-28 18:40:15 -06:00
let mreq = libc::ip_mreq {
imr_multiaddr: addr,
// interface == INADDR_ANY
imr_interface: libc::in_addr { s_addr: 0x0 },
};
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq)
2013-12-28 18:40:15 -06:00
}
In6Addr(addr) => {
let mreq = libc::ip6_mreq {
ipv6mr_multiaddr: addr,
ipv6mr_interface: 0,
};
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq)
2013-12-28 18:40:15 -06:00
}
}
}
#[cfg(target_os = "linux")]
fn lock_nonblocking(&self) {}
#[cfg(not(target_os = "linux"))]
fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
let ret = Guard {
fd: self.fd(),
guard: unsafe { self.inner.lock.lock() },
};
assert!(util::set_nonblocking(self.fd(), true).is_ok());
ret
}
2013-12-28 18:40:15 -06:00
}
impl rtio::RtioSocket for UdpSocket {
2014-06-04 02:00:49 -05:00
fn socket_name(&mut self) -> IoResult<rtio::SocketAddr> {
2013-12-28 18:40:15 -06:00
sockname(self.fd(), libc::getsockname)
}
}
#[cfg(windows)] type msglen_t = libc::c_int;
#[cfg(unix)] type msglen_t = libc::size_t;
impl rtio::RtioUdpSocket for UdpSocket {
fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, rtio::SocketAddr)> {
let fd = self.fd();
let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
let storagep = &mut storage as *mut _ as *mut libc::sockaddr;
let mut addrlen: libc::socklen_t =
mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
let dolock = || self.lock_nonblocking();
let n = try!(read(fd, self.read_deadline, dolock, |nb| unsafe {
let flags = if nb {c::MSG_DONTWAIT} else {0};
libc::recvfrom(fd,
buf.as_mut_ptr() as *mut libc::c_void,
buf.len() as msglen_t,
flags,
storagep,
&mut addrlen) as libc::c_int
}));
sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| {
Ok((n as uint, addr))
})
2013-12-28 18:40:15 -06:00
}
fn send_to(&mut self, buf: &[u8], dst: rtio::SocketAddr) -> IoResult<()> {
let mut storage = unsafe { mem::zeroed() };
let dstlen = addr_to_sockaddr(dst, &mut storage);
let dstp = &storage as *const _ as *const libc::sockaddr;
let fd = self.fd();
let dolock = || self.lock_nonblocking();
2014-06-25 14:47:34 -05:00
let dowrite = |nb, buf: *const u8, len: uint| unsafe {
let flags = if nb {c::MSG_DONTWAIT} else {0};
libc::sendto(fd,
2014-06-25 14:47:34 -05:00
buf as *const libc::c_void,
len as msglen_t,
flags,
dstp,
dstlen) as i64
};
let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite));
if n != buf.len() {
2014-06-04 02:00:49 -05:00
Err(util::short_write(n, "couldn't send entire packet at once"))
} else {
Ok(())
2013-12-28 18:40:15 -06:00
}
}
2014-06-04 02:00:49 -05:00
fn join_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
2013-12-28 18:40:15 -06:00
match multi {
2014-06-04 02:00:49 -05:00
rtio::Ipv4Addr(..) => {
2013-12-28 18:40:15 -06:00
self.set_membership(multi, libc::IP_ADD_MEMBERSHIP)
}
2014-06-04 02:00:49 -05:00
rtio::Ipv6Addr(..) => {
2013-12-28 18:40:15 -06:00
self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP)
}
}
}
2014-06-04 02:00:49 -05:00
fn leave_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> {
2013-12-28 18:40:15 -06:00
match multi {
2014-06-04 02:00:49 -05:00
rtio::Ipv4Addr(..) => {
2013-12-28 18:40:15 -06:00
self.set_membership(multi, libc::IP_DROP_MEMBERSHIP)
}
2014-06-04 02:00:49 -05:00
rtio::Ipv6Addr(..) => {
2013-12-28 18:40:15 -06:00
self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP)
}
}
}
fn loop_multicast_locally(&mut self) -> IoResult<()> {
self.set_multicast_loop(true)
}
fn dont_loop_multicast_locally(&mut self) -> IoResult<()> {
self.set_multicast_loop(false)
}
fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> {
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL,
2013-12-28 18:40:15 -06:00
ttl as libc::c_int)
}
fn time_to_live(&mut self, ttl: int) -> IoResult<()> {
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int)
2013-12-28 18:40:15 -06:00
}
fn hear_broadcasts(&mut self) -> IoResult<()> {
self.set_broadcast(true)
}
fn ignore_broadcasts(&mut self) -> IoResult<()> {
self.set_broadcast(false)
}
2014-06-14 13:03:34 -05:00
fn clone(&self) -> Box<rtio::RtioUdpSocket + Send> {
box UdpSocket {
inner: self.inner.clone(),
read_deadline: 0,
write_deadline: 0,
2014-06-14 13:03:34 -05:00
} as Box<rtio::RtioUdpSocket + Send>
Implement clone() for TCP/UDP/Unix sockets This is part of the overall strategy I would like to take when approaching issue #11165. The only two I/O objects that reasonably want to be "split" are the network stream objects. Everything else can be "split" by just creating another version. The initial idea I had was the literally split the object into a reader and a writer half, but that would just introduce lots of clutter with extra interfaces that were a little unnnecssary, or it would return a ~Reader and a ~Writer which means you couldn't access things like the remote peer name or local socket name. The solution I found to be nicer was to just clone the stream itself. The clone is just a clone of the handle, nothing fancy going on at the kernel level. Conceptually I found this very easy to wrap my head around (everything else supports clone()), and it solved the "split" problem at the same time. The cloning support is pretty specific per platform/lib combination: * native/win32 - uses some specific WSA apis to clone the SOCKET handle * native/unix - uses dup() to get another file descriptor * green/all - This is where things get interesting. When we support full clones of a handle, this implies that we're allowing simultaneous writes and reads to happen. It turns out that libuv doesn't support two simultaneous reads or writes of the same object. It does support *one* read and *one* write at the same time, however. Some extra infrastructure was added to just block concurrent writers/readers until the previous read/write operation was completed. I've added tests to the tcp/unix modules to make sure that this functionality is supported everywhere.
2014-01-22 21:32:16 -06:00
}
fn set_timeout(&mut self, timeout: Option<u64>) {
let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
self.read_deadline = deadline;
self.write_deadline = deadline;
}
fn set_read_timeout(&mut self, timeout: Option<u64>) {
self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
fn set_write_timeout(&mut self, timeout: Option<u64>) {
self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
}
}
////////////////////////////////////////////////////////////////////////////////
// Timeout helpers
//
// The read/write functions below are the helpers for reading/writing a socket
// with a possible deadline specified. This is generally viewed as a timed out
// I/O operation.
//
// From the application's perspective, timeouts apply to the I/O object, not to
// the underlying file descriptor (it's one timeout per object). This means that
// we can't use the SO_RCVTIMEO and corresponding send timeout option.
//
// The next idea to implement timeouts would be to use nonblocking I/O. An
// invocation of select() would wait (with a timeout) for a socket to be ready.
// Once its ready, we can perform the operation. Note that the operation *must*
// be nonblocking, even though select() says the socket is ready. This is
// because some other thread could have come and stolen our data (handles can be
// cloned).
//
// To implement nonblocking I/O, the first option we have is to use the
// O_NONBLOCK flag. Remember though that this is a global setting, affecting all
// I/O objects, so this was initially viewed as unwise.
//
// It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to
// send/recv, but the niftiness wears off once you realize it only works well on
// Linux [1] [2]. This means that it's pretty easy to get a nonblocking
// operation on Linux (no flag fiddling, no affecting other objects), but not on
// other platforms.
//
// To work around this constraint on other platforms, we end up using the
// original strategy of flipping the O_NONBLOCK flag. As mentioned before, this
// could cause other objects' blocking operations to suddenly become
// nonblocking. To get around this, a "blocking operation" which returns EAGAIN
// falls back to using the same code path as nonblocking operations, but with an
// infinite timeout (select + send/recv). This helps emulate blocking
// reads/writes despite the underlying descriptor being nonblocking, as well as
// optimizing the fast path of just hitting one syscall in the good case.
//
// As a final caveat, this implementation uses a mutex so only one thread is
// doing a nonblocking operation at at time. This is the operation that comes
// after the select() (at which point we think the socket is ready). This is
// done for sanity to ensure that the state of the O_NONBLOCK flag is what we
// expect (wouldn't want someone turning it on when it should be off!). All
// operations performed in the lock are *nonblocking* to avoid holding the mutex
// forever.
//
// So, in summary, Linux uses MSG_DONTWAIT and doesn't need mutexes, everyone
// else uses O_NONBLOCK and mutexes with some trickery to make sure blocking
// reads/writes are still blocking.
//
// Fun, fun!
//
// [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html
// [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait
pub fn read<T>(fd: sock_t,
deadline: u64,
lock: || -> T,
read: |bool| -> libc::c_int) -> IoResult<uint> {
let mut ret = -1;
if deadline == 0 {
ret = retry(|| read(false));
}
if deadline != 0 || (ret == -1 && util::wouldblock()) {
let deadline = match deadline {
0 => None,
n => Some(n),
};
loop {
// With a timeout, first we wait for the socket to become
// readable using select(), specifying the relevant timeout for
// our previously set deadline.
try!(util::await([fd], deadline, util::Readable));
// At this point, we're still within the timeout, and we've
// determined that the socket is readable (as returned by
// select). We must still read the socket in *nonblocking* mode
// because some other thread could come steal our data. If we
// fail to read some data, we retry (hence the outer loop) and
// wait for the socket to become readable again.
let _guard = lock();
match retry(|| read(deadline.is_some())) {
-1 if util::wouldblock() => {}
-1 => return Err(os::last_error()),
n => { ret = n; break }
}
}
}
match ret {
2014-06-04 02:00:49 -05:00
0 => Err(util::eof()),
n if n < 0 => Err(os::last_error()),
n => Ok(n as uint)
}
}
pub fn write<T>(fd: sock_t,
deadline: u64,
buf: &[u8],
write_everything: bool,
lock: || -> T,
2014-06-25 14:47:34 -05:00
write: |bool, *const u8, uint| -> i64) -> IoResult<uint> {
let mut ret = -1;
let mut written = 0;
if deadline == 0 {
if write_everything {
ret = keep_going(buf, |inner, len| {
written = buf.len() - len;
write(false, inner, len)
});
} else {
ret = retry(|| { write(false, buf.as_ptr(), buf.len()) });
if ret > 0 { written = ret as uint; }
}
}
if deadline != 0 || (ret == -1 && util::wouldblock()) {
let deadline = match deadline {
0 => None,
n => Some(n),
};
while written < buf.len() && (write_everything || written == 0) {
// As with read(), first wait for the socket to be ready for
// the I/O operation.
match util::await([fd], deadline, util::Writable) {
2014-06-04 02:00:49 -05:00
Err(ref e) if e.code == libc::EOF as uint && written > 0 => {
assert!(deadline.is_some());
2014-06-04 02:00:49 -05:00
return Err(util::short_write(written, "short write"))
}
Err(e) => return Err(e),
Ok(()) => {}
}
// Also as with read(), we use MSG_DONTWAIT to guard ourselves
2014-07-02 20:27:07 -05:00
// against unforeseen circumstances.
let _guard = lock();
let ptr = buf.slice_from(written).as_ptr();
let len = buf.len() - written;
match retry(|| write(deadline.is_some(), ptr, len)) {
-1 if util::wouldblock() => {}
-1 => return Err(os::last_error()),
n => { written += n as uint; }
}
}
ret = 0;
}
if ret < 0 {
Err(os::last_error())
} else {
Ok(written)
}
2013-12-28 18:40:15 -06:00
}
#[cfg(windows)]
mod os {
use libc;
use std::mem;
use std::rt::rtio::{IoError, IoResult};
use io::c;
pub type sock_t = libc::SOCKET;
pub struct Event(c::WSAEVENT);
impl Event {
pub fn new() -> IoResult<Event> {
let event = unsafe { c::WSACreateEvent() };
if event == c::WSA_INVALID_EVENT {
Err(last_error())
} else {
Ok(Event(event))
}
}
pub fn handle(&self) -> c::WSAEVENT { let Event(handle) = *self; handle }
}
impl Drop for Event {
fn drop(&mut self) {
unsafe { let _ = c::WSACloseEvent(self.handle()); }
}
}
pub fn init() {
unsafe {
use std::rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
static mut INITIALIZED: bool = false;
static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
let _guard = LOCK.lock();
if !INITIALIZED {
let mut data: c::WSADATA = mem::zeroed();
let ret = c::WSAStartup(0x202, // version 2.2
&mut data);
assert_eq!(ret, 0);
INITIALIZED = true;
}
}
}
pub fn last_error() -> IoError {
use std::os;
let code = unsafe { c::WSAGetLastError() as uint };
IoError {
code: code,
extra: 0,
detail: Some(os::error_string(code)),
}
}
pub unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); }
}
#[cfg(unix)]
mod os {
use libc;
use std::rt::rtio::IoError;
use io;
pub type sock_t = io::file::fd_t;
pub fn init() {}
pub fn last_error() -> IoError { io::last_error() }
pub unsafe fn close(sock: sock_t) { let _ = libc::close(sock); }
}