// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // // Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. use alloc::arc::Arc; use libc; use std::mem; use std::ptr; use std::rt::mutex; use std::rt::rtio::{mod, IoResult, IoError}; use std::sync::atomic; use super::{retry, keep_going}; use super::c; use super::util; #[cfg(unix)] use super::process; #[cfg(unix)] use super::file::FileDesc; pub use self::os::{init, sock_t, last_error}; //////////////////////////////////////////////////////////////////////////////// // sockaddr and misc bindings //////////////////////////////////////////////////////////////////////////////// pub fn htons(u: u16) -> u16 { u.to_be() } pub fn ntohs(u: u16) -> u16 { Int::from_be(u) } enum InAddr { InAddr(libc::in_addr), In6Addr(libc::in6_addr), } fn ip_to_inaddr(ip: rtio::IpAddr) -> InAddr { match ip { rtio::Ipv4Addr(a, b, c, d) => { let ip = (a as u32 << 24) | (b as u32 << 16) | (c as u32 << 8) | (d as u32 << 0); InAddr(libc::in_addr { s_addr: Int::from_be(ip) }) } rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => { In6Addr(libc::in6_addr { s6_addr: [ htons(a), htons(b), htons(c), htons(d), htons(e), htons(f), htons(g), htons(h), ] }) } } } fn addr_to_sockaddr(addr: rtio::SocketAddr, storage: &mut libc::sockaddr_storage) -> libc::socklen_t { unsafe { let len = match ip_to_inaddr(addr.ip) { InAddr(inaddr) => { let storage = storage as *mut _ as *mut libc::sockaddr_in; (*storage).sin_family = libc::AF_INET as libc::sa_family_t; (*storage).sin_port = htons(addr.port); (*storage).sin_addr = inaddr; mem::size_of::() } In6Addr(inaddr) => { let storage = storage as *mut _ as *mut libc::sockaddr_in6; (*storage).sin6_family = libc::AF_INET6 as libc::sa_family_t; (*storage).sin6_port = htons(addr.port); (*storage).sin6_addr = inaddr; mem::size_of::() } }; return len as libc::socklen_t; } } fn socket(addr: rtio::SocketAddr, ty: libc::c_int) -> IoResult { unsafe { let fam = match addr.ip { rtio::Ipv4Addr(..) => libc::AF_INET, rtio::Ipv6Addr(..) => libc::AF_INET6, }; match libc::socket(fam, ty, 0) { -1 => Err(os::last_error()), fd => Ok(fd), } } } fn setsockopt(fd: sock_t, opt: libc::c_int, val: libc::c_int, payload: T) -> IoResult<()> { unsafe { let payload = &payload as *const T as *const libc::c_void; let ret = libc::setsockopt(fd, opt, val, payload, mem::size_of::() as libc::socklen_t); if ret != 0 { Err(os::last_error()) } else { Ok(()) } } } pub fn getsockopt(fd: sock_t, opt: libc::c_int, val: libc::c_int) -> IoResult { unsafe { let mut slot: T = mem::zeroed(); let mut len = mem::size_of::() as libc::socklen_t; let ret = c::getsockopt(fd, opt, val, &mut slot as *mut _ as *mut _, &mut len); if ret != 0 { Err(os::last_error()) } else { assert!(len as uint == mem::size_of::()); Ok(slot) } } } fn sockname(fd: sock_t, f: unsafe extern "system" fn(sock_t, *mut libc::sockaddr, *mut libc::socklen_t) -> libc::c_int) -> IoResult { let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() }; let mut len = mem::size_of::() as libc::socklen_t; unsafe { let storage = &mut storage as *mut libc::sockaddr_storage; let ret = f(fd, storage as *mut libc::sockaddr, &mut len as *mut libc::socklen_t); if ret != 0 { return Err(os::last_error()) } } return sockaddr_to_addr(&storage, len as uint); } pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage, len: uint) -> IoResult { match storage.ss_family as libc::c_int { libc::AF_INET => { assert!(len as uint >= mem::size_of::()); let storage: &libc::sockaddr_in = unsafe { mem::transmute(storage) }; let ip = (storage.sin_addr.s_addr as u32).to_be(); let a = (ip >> 24) as u8; let b = (ip >> 16) as u8; let c = (ip >> 8) as u8; let d = (ip >> 0) as u8; Ok(rtio::SocketAddr { ip: rtio::Ipv4Addr(a, b, c, d), port: ntohs(storage.sin_port), }) } libc::AF_INET6 => { assert!(len as uint >= mem::size_of::()); let storage: &libc::sockaddr_in6 = unsafe { mem::transmute(storage) }; let a = ntohs(storage.sin6_addr.s6_addr[0]); let b = ntohs(storage.sin6_addr.s6_addr[1]); let c = ntohs(storage.sin6_addr.s6_addr[2]); let d = ntohs(storage.sin6_addr.s6_addr[3]); let e = ntohs(storage.sin6_addr.s6_addr[4]); let f = ntohs(storage.sin6_addr.s6_addr[5]); let g = ntohs(storage.sin6_addr.s6_addr[6]); let h = ntohs(storage.sin6_addr.s6_addr[7]); Ok(rtio::SocketAddr { ip: rtio::Ipv6Addr(a, b, c, d, e, f, g, h), port: ntohs(storage.sin6_port), }) } _ => { #[cfg(unix)] use libc::EINVAL as ERROR; #[cfg(windows)] use libc::WSAEINVAL as ERROR; Err(IoError { code: ERROR as uint, extra: 0, detail: None, }) } } } //////////////////////////////////////////////////////////////////////////////// // TCP streams //////////////////////////////////////////////////////////////////////////////// pub struct TcpStream { inner: Arc, read_deadline: u64, write_deadline: u64, } struct Inner { fd: sock_t, // Unused on Linux, where this lock is not necessary. #[allow(dead_code)] lock: mutex::NativeMutex } pub struct Guard<'a> { pub fd: sock_t, pub guard: mutex::LockGuard<'a>, } impl Inner { fn new(fd: sock_t) -> Inner { Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } } } } impl TcpStream { pub fn connect(addr: rtio::SocketAddr, timeout: Option) -> IoResult { let fd = try!(socket(addr, libc::SOCK_STREAM)); let ret = TcpStream::new(Inner::new(fd)); let mut storage = unsafe { mem::zeroed() }; let len = addr_to_sockaddr(addr, &mut storage); let addrp = &storage as *const _ as *const libc::sockaddr; match timeout { Some(timeout) => { try!(util::connect_timeout(fd, addrp, len, timeout)); Ok(ret) }, None => { match retry(|| unsafe { libc::connect(fd, addrp, len) }) { -1 => Err(os::last_error()), _ => Ok(ret), } } } } fn new(inner: Inner) -> TcpStream { TcpStream { inner: Arc::new(inner), read_deadline: 0, write_deadline: 0, } } pub fn fd(&self) -> sock_t { self.inner.fd } fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> { setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY, nodelay as libc::c_int) } fn set_keepalive(&mut self, seconds: Option) -> IoResult<()> { let ret = setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_KEEPALIVE, seconds.is_some() as libc::c_int); match seconds { Some(n) => ret.and_then(|()| self.set_tcp_keepalive(n)), None => ret, } } #[cfg(target_os = "macos")] #[cfg(target_os = "ios")] fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> { setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPALIVE, seconds as libc::c_int) } #[cfg(target_os = "freebsd")] #[cfg(target_os = "dragonfly")] fn set_tcp_keepalive(&mut self, seconds: uint) -> IoResult<()> { setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_KEEPIDLE, seconds as libc::c_int) } #[cfg(not(target_os = "macos"), not(target_os = "ios"), not(target_os = "freebsd"), not(target_os = "dragonfly"))] fn set_tcp_keepalive(&mut self, _seconds: uint) -> IoResult<()> { Ok(()) } #[cfg(target_os = "linux")] fn lock_nonblocking(&self) {} #[cfg(not(target_os = "linux"))] fn lock_nonblocking<'a>(&'a self) -> Guard<'a> { let ret = Guard { fd: self.fd(), guard: unsafe { self.inner.lock.lock() }, }; assert!(util::set_nonblocking(self.fd(), true).is_ok()); ret } } #[cfg(windows)] type wrlen = libc::c_int; #[cfg(not(windows))] type wrlen = libc::size_t; impl rtio::RtioTcpStream for TcpStream { fn read(&mut self, buf: &mut [u8]) -> IoResult { let fd = self.fd(); let dolock = || self.lock_nonblocking(); let doread = |nb| unsafe { let flags = if nb {c::MSG_DONTWAIT} else {0}; libc::recv(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len() as wrlen, flags) as libc::c_int }; read(fd, self.read_deadline, dolock, doread) } fn write(&mut self, buf: &[u8]) -> IoResult<()> { let fd = self.fd(); let dolock = || self.lock_nonblocking(); let dowrite = |nb: bool, buf: *const u8, len: uint| unsafe { let flags = if nb {c::MSG_DONTWAIT} else {0}; libc::send(fd, buf as *mut libc::c_void, len as wrlen, flags) as i64 }; match write(fd, self.write_deadline, buf, true, dolock, dowrite) { Ok(_) => Ok(()), Err(e) => Err(e) } } fn peer_name(&mut self) -> IoResult { sockname(self.fd(), libc::getpeername) } fn control_congestion(&mut self) -> IoResult<()> { self.set_nodelay(false) } fn nodelay(&mut self) -> IoResult<()> { self.set_nodelay(true) } fn keepalive(&mut self, delay_in_seconds: uint) -> IoResult<()> { self.set_keepalive(Some(delay_in_seconds)) } fn letdie(&mut self) -> IoResult<()> { self.set_keepalive(None) } fn clone(&self) -> Box { box TcpStream { inner: self.inner.clone(), read_deadline: 0, write_deadline: 0, } as Box } fn close_write(&mut self) -> IoResult<()> { super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) }) } fn close_read(&mut self) -> IoResult<()> { super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) }) } fn set_timeout(&mut self, timeout: Option) { let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); self.read_deadline = deadline; self.write_deadline = deadline; } fn set_read_timeout(&mut self, timeout: Option) { self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); } fn set_write_timeout(&mut self, timeout: Option) { self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); } } impl rtio::RtioSocket for TcpStream { fn socket_name(&mut self) -> IoResult { sockname(self.fd(), libc::getsockname) } } impl Drop for Inner { fn drop(&mut self) { unsafe { os::close(self.fd); } } } #[unsafe_destructor] impl<'a> Drop for Guard<'a> { fn drop(&mut self) { assert!(util::set_nonblocking(self.fd, false).is_ok()); } } //////////////////////////////////////////////////////////////////////////////// // TCP listeners //////////////////////////////////////////////////////////////////////////////// pub struct TcpListener { inner: Inner, } impl TcpListener { pub fn bind(addr: rtio::SocketAddr) -> IoResult { let fd = try!(socket(addr, libc::SOCK_STREAM)); let ret = TcpListener { inner: Inner::new(fd) }; let mut storage = unsafe { mem::zeroed() }; let len = addr_to_sockaddr(addr, &mut storage); let addrp = &storage as *const _ as *const libc::sockaddr; // On platforms with Berkeley-derived sockets, this allows // to quickly rebind a socket, without needing to wait for // the OS to clean up the previous one. if cfg!(unix) { try!(setsockopt(fd, libc::SOL_SOCKET, libc::SO_REUSEADDR, 1 as libc::c_int)); } match unsafe { libc::bind(fd, addrp, len) } { -1 => Err(os::last_error()), _ => Ok(ret), } } pub fn fd(&self) -> sock_t { self.inner.fd } pub fn native_listen(self, backlog: int) -> IoResult { match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { -1 => Err(os::last_error()), #[cfg(unix)] _ => { let (reader, writer) = try!(process::pipe()); try!(util::set_nonblocking(reader.fd(), true)); try!(util::set_nonblocking(writer.fd(), true)); try!(util::set_nonblocking(self.fd(), true)); Ok(TcpAcceptor { inner: Arc::new(AcceptorInner { listener: self, reader: reader, writer: writer, closed: atomic::AtomicBool::new(false), }), deadline: 0, }) } #[cfg(windows)] _ => { let accept = try!(os::Event::new()); let ret = unsafe { c::WSAEventSelect(self.fd(), accept.handle(), c::FD_ACCEPT) }; if ret != 0 { return Err(os::last_error()) } Ok(TcpAcceptor { inner: Arc::new(AcceptorInner { listener: self, abort: try!(os::Event::new()), accept: accept, closed: atomic::AtomicBool::new(false), }), deadline: 0, }) } } } } impl rtio::RtioTcpListener for TcpListener { fn listen(self: Box) -> IoResult> { self.native_listen(128).map(|a| { box a as Box }) } } impl rtio::RtioSocket for TcpListener { fn socket_name(&mut self) -> IoResult { sockname(self.fd(), libc::getsockname) } } pub struct TcpAcceptor { inner: Arc, deadline: u64, } #[cfg(unix)] struct AcceptorInner { listener: TcpListener, reader: FileDesc, writer: FileDesc, closed: atomic::AtomicBool, } #[cfg(windows)] struct AcceptorInner { listener: TcpListener, abort: os::Event, accept: os::Event, closed: atomic::AtomicBool, } impl TcpAcceptor { pub fn fd(&self) -> sock_t { self.inner.listener.fd() } #[cfg(unix)] pub fn native_accept(&mut self) -> IoResult { // In implementing accept, the two main concerns are dealing with // close_accept() and timeouts. The unix implementation is based on a // nonblocking accept plus a call to select(). Windows ends up having // an entirely separate implementation than unix, which is explained // below. // // To implement timeouts, all blocking is done via select() instead of // accept() by putting the socket in non-blocking mode. Because // select() takes a timeout argument, we just pass through the timeout // to select(). // // To implement close_accept(), we have a self-pipe to ourselves which // is passed to select() along with the socket being accepted on. The // self-pipe is never written to unless close_accept() is called. let deadline = if self.deadline == 0 {None} else {Some(self.deadline)}; while !self.inner.closed.load(atomic::SeqCst) { match retry(|| unsafe { libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut()) }) { -1 if util::wouldblock() => {} -1 => return Err(os::last_error()), fd => return Ok(TcpStream::new(Inner::new(fd as sock_t))), } try!(util::await([self.fd(), self.inner.reader.fd()], deadline, util::Readable)); } Err(util::eof()) } #[cfg(windows)] pub fn native_accept(&mut self) -> IoResult { // Unlink unix, windows cannot invoke `select` on arbitrary file // descriptors like pipes, only sockets. Consequently, windows cannot // use the same implementation as unix for accept() when close_accept() // is considered. // // In order to implement close_accept() and timeouts, windows uses // event handles. An acceptor-specific abort event is created which // will only get set in close_accept(), and it will never be un-set. // Additionally, another acceptor-specific event is associated with the // FD_ACCEPT network event. // // These two events are then passed to WaitForMultipleEvents to see // which one triggers first, and the timeout passed to this function is // the local timeout for the acceptor. // // If the wait times out, then the accept timed out. If the wait // succeeds with the abort event, then we were closed, and if the wait // succeeds otherwise, then we do a nonblocking poll via `accept` to // see if we can accept a connection. The connection is candidate to be // stolen, so we do all of this in a loop as well. let events = [self.inner.abort.handle(), self.inner.accept.handle()]; while !self.inner.closed.load(atomic::SeqCst) { let ms = if self.deadline == 0 { c::WSA_INFINITE as u64 } else { let now = ::io::timer::now(); if self.deadline < now {0} else {self.deadline - now} }; let ret = unsafe { c::WSAWaitForMultipleEvents(2, events.as_ptr(), libc::FALSE, ms as libc::DWORD, libc::FALSE) }; match ret { c::WSA_WAIT_TIMEOUT => { return Err(util::timeout("accept timed out")) } c::WSA_WAIT_FAILED => return Err(os::last_error()), c::WSA_WAIT_EVENT_0 => break, n => assert_eq!(n, c::WSA_WAIT_EVENT_0 + 1), } let mut wsaevents: c::WSANETWORKEVENTS = unsafe { mem::zeroed() }; let ret = unsafe { c::WSAEnumNetworkEvents(self.fd(), events[1], &mut wsaevents) }; if ret != 0 { return Err(os::last_error()) } if wsaevents.lNetworkEvents & c::FD_ACCEPT == 0 { continue } match unsafe { libc::accept(self.fd(), ptr::null_mut(), ptr::null_mut()) } { -1 if util::wouldblock() => {} -1 => return Err(os::last_error()), // Accepted sockets inherit the same properties as the caller, // so we need to deregister our event and switch the socket back // to blocking mode fd => { let stream = TcpStream::new(Inner::new(fd)); let ret = unsafe { c::WSAEventSelect(fd, events[1], 0) }; if ret != 0 { return Err(os::last_error()) } try!(util::set_nonblocking(fd, false)); return Ok(stream) } } } Err(util::eof()) } } impl rtio::RtioSocket for TcpAcceptor { fn socket_name(&mut self) -> IoResult { sockname(self.fd(), libc::getsockname) } } impl rtio::RtioTcpAcceptor for TcpAcceptor { fn accept(&mut self) -> IoResult> { self.native_accept().map(|s| box s as Box) } fn accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) } fn dont_accept_simultaneously(&mut self) -> IoResult<()> { Ok(()) } fn set_timeout(&mut self, timeout: Option) { self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); } fn clone(&self) -> Box { box TcpAcceptor { inner: self.inner.clone(), deadline: 0, } as Box } #[cfg(unix)] fn close_accept(&mut self) -> IoResult<()> { self.inner.closed.store(true, atomic::SeqCst); let mut fd = FileDesc::new(self.inner.writer.fd(), false); match fd.inner_write([0]) { Ok(..) => Ok(()), Err(..) if util::wouldblock() => Ok(()), Err(e) => Err(e), } } #[cfg(windows)] fn close_accept(&mut self) -> IoResult<()> { self.inner.closed.store(true, atomic::SeqCst); let ret = unsafe { c::WSASetEvent(self.inner.abort.handle()) }; if ret == libc::TRUE { Ok(()) } else { Err(os::last_error()) } } } //////////////////////////////////////////////////////////////////////////////// // UDP //////////////////////////////////////////////////////////////////////////////// pub struct UdpSocket { inner: Arc, read_deadline: u64, write_deadline: u64, } impl UdpSocket { pub fn bind(addr: rtio::SocketAddr) -> IoResult { let fd = try!(socket(addr, libc::SOCK_DGRAM)); let ret = UdpSocket { inner: Arc::new(Inner::new(fd)), read_deadline: 0, write_deadline: 0, }; let mut storage = unsafe { mem::zeroed() }; let len = addr_to_sockaddr(addr, &mut storage); let addrp = &storage as *const _ as *const libc::sockaddr; match unsafe { libc::bind(fd, addrp, len) } { -1 => Err(os::last_error()), _ => Ok(ret), } } pub fn fd(&self) -> sock_t { self.inner.fd } pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> { setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST, on as libc::c_int) } pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> { setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_LOOP, on as libc::c_int) } pub fn set_membership(&mut self, addr: rtio::IpAddr, opt: libc::c_int) -> IoResult<()> { match ip_to_inaddr(addr) { InAddr(addr) => { let mreq = libc::ip_mreq { imr_multiaddr: addr, // interface == INADDR_ANY imr_interface: libc::in_addr { s_addr: 0x0 }, }; setsockopt(self.fd(), libc::IPPROTO_IP, opt, mreq) } In6Addr(addr) => { let mreq = libc::ip6_mreq { ipv6mr_multiaddr: addr, ipv6mr_interface: 0, }; setsockopt(self.fd(), libc::IPPROTO_IPV6, opt, mreq) } } } #[cfg(target_os = "linux")] fn lock_nonblocking(&self) {} #[cfg(not(target_os = "linux"))] fn lock_nonblocking<'a>(&'a self) -> Guard<'a> { let ret = Guard { fd: self.fd(), guard: unsafe { self.inner.lock.lock() }, }; assert!(util::set_nonblocking(self.fd(), true).is_ok()); ret } } impl rtio::RtioSocket for UdpSocket { fn socket_name(&mut self) -> IoResult { sockname(self.fd(), libc::getsockname) } } #[cfg(windows)] type msglen_t = libc::c_int; #[cfg(unix)] type msglen_t = libc::size_t; impl rtio::RtioUdpSocket for UdpSocket { fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, rtio::SocketAddr)> { let fd = self.fd(); let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() }; let storagep = &mut storage as *mut _ as *mut libc::sockaddr; let mut addrlen: libc::socklen_t = mem::size_of::() as libc::socklen_t; let dolock = || self.lock_nonblocking(); let n = try!(read(fd, self.read_deadline, dolock, |nb| unsafe { let flags = if nb {c::MSG_DONTWAIT} else {0}; libc::recvfrom(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len() as msglen_t, flags, storagep, &mut addrlen) as libc::c_int })); sockaddr_to_addr(&storage, addrlen as uint).and_then(|addr| { Ok((n as uint, addr)) }) } fn send_to(&mut self, buf: &[u8], dst: rtio::SocketAddr) -> IoResult<()> { let mut storage = unsafe { mem::zeroed() }; let dstlen = addr_to_sockaddr(dst, &mut storage); let dstp = &storage as *const _ as *const libc::sockaddr; let fd = self.fd(); let dolock = || self.lock_nonblocking(); let dowrite = |nb, buf: *const u8, len: uint| unsafe { let flags = if nb {c::MSG_DONTWAIT} else {0}; libc::sendto(fd, buf as *const libc::c_void, len as msglen_t, flags, dstp, dstlen) as i64 }; let n = try!(write(fd, self.write_deadline, buf, false, dolock, dowrite)); if n != buf.len() { Err(util::short_write(n, "couldn't send entire packet at once")) } else { Ok(()) } } fn join_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> { match multi { rtio::Ipv4Addr(..) => { self.set_membership(multi, libc::IP_ADD_MEMBERSHIP) } rtio::Ipv6Addr(..) => { self.set_membership(multi, libc::IPV6_ADD_MEMBERSHIP) } } } fn leave_multicast(&mut self, multi: rtio::IpAddr) -> IoResult<()> { match multi { rtio::Ipv4Addr(..) => { self.set_membership(multi, libc::IP_DROP_MEMBERSHIP) } rtio::Ipv6Addr(..) => { self.set_membership(multi, libc::IPV6_DROP_MEMBERSHIP) } } } fn loop_multicast_locally(&mut self) -> IoResult<()> { self.set_multicast_loop(true) } fn dont_loop_multicast_locally(&mut self) -> IoResult<()> { self.set_multicast_loop(false) } fn multicast_time_to_live(&mut self, ttl: int) -> IoResult<()> { setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_MULTICAST_TTL, ttl as libc::c_int) } fn time_to_live(&mut self, ttl: int) -> IoResult<()> { setsockopt(self.fd(), libc::IPPROTO_IP, libc::IP_TTL, ttl as libc::c_int) } fn hear_broadcasts(&mut self) -> IoResult<()> { self.set_broadcast(true) } fn ignore_broadcasts(&mut self) -> IoResult<()> { self.set_broadcast(false) } fn clone(&self) -> Box { box UdpSocket { inner: self.inner.clone(), read_deadline: 0, write_deadline: 0, } as Box } fn set_timeout(&mut self, timeout: Option) { let deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); self.read_deadline = deadline; self.write_deadline = deadline; } fn set_read_timeout(&mut self, timeout: Option) { self.read_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); } fn set_write_timeout(&mut self, timeout: Option) { self.write_deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0); } } //////////////////////////////////////////////////////////////////////////////// // Timeout helpers // // The read/write functions below are the helpers for reading/writing a socket // with a possible deadline specified. This is generally viewed as a timed out // I/O operation. // // From the application's perspective, timeouts apply to the I/O object, not to // the underlying file descriptor (it's one timeout per object). This means that // we can't use the SO_RCVTIMEO and corresponding send timeout option. // // The next idea to implement timeouts would be to use nonblocking I/O. An // invocation of select() would wait (with a timeout) for a socket to be ready. // Once its ready, we can perform the operation. Note that the operation *must* // be nonblocking, even though select() says the socket is ready. This is // because some other thread could have come and stolen our data (handles can be // cloned). // // To implement nonblocking I/O, the first option we have is to use the // O_NONBLOCK flag. Remember though that this is a global setting, affecting all // I/O objects, so this was initially viewed as unwise. // // It turns out that there's this nifty MSG_DONTWAIT flag which can be passed to // send/recv, but the niftiness wears off once you realize it only works well on // Linux [1] [2]. This means that it's pretty easy to get a nonblocking // operation on Linux (no flag fiddling, no affecting other objects), but not on // other platforms. // // To work around this constraint on other platforms, we end up using the // original strategy of flipping the O_NONBLOCK flag. As mentioned before, this // could cause other objects' blocking operations to suddenly become // nonblocking. To get around this, a "blocking operation" which returns EAGAIN // falls back to using the same code path as nonblocking operations, but with an // infinite timeout (select + send/recv). This helps emulate blocking // reads/writes despite the underlying descriptor being nonblocking, as well as // optimizing the fast path of just hitting one syscall in the good case. // // As a final caveat, this implementation uses a mutex so only one thread is // doing a nonblocking operation at at time. This is the operation that comes // after the select() (at which point we think the socket is ready). This is // done for sanity to ensure that the state of the O_NONBLOCK flag is what we // expect (wouldn't want someone turning it on when it should be off!). All // operations performed in the lock are *nonblocking* to avoid holding the mutex // forever. // // So, in summary, Linux uses MSG_DONTWAIT and doesn't need mutexes, everyone // else uses O_NONBLOCK and mutexes with some trickery to make sure blocking // reads/writes are still blocking. // // Fun, fun! // // [1] http://twistedmatrix.com/pipermail/twisted-commits/2012-April/034692.html // [2] http://stackoverflow.com/questions/19819198/does-send-msg-dontwait pub fn read(fd: sock_t, deadline: u64, lock: || -> T, read: |bool| -> libc::c_int) -> IoResult { let mut ret = -1; if deadline == 0 { ret = retry(|| read(false)); } if deadline != 0 || (ret == -1 && util::wouldblock()) { let deadline = match deadline { 0 => None, n => Some(n), }; loop { // With a timeout, first we wait for the socket to become // readable using select(), specifying the relevant timeout for // our previously set deadline. try!(util::await([fd], deadline, util::Readable)); // At this point, we're still within the timeout, and we've // determined that the socket is readable (as returned by // select). We must still read the socket in *nonblocking* mode // because some other thread could come steal our data. If we // fail to read some data, we retry (hence the outer loop) and // wait for the socket to become readable again. let _guard = lock(); match retry(|| read(deadline.is_some())) { -1 if util::wouldblock() => {} -1 => return Err(os::last_error()), n => { ret = n; break } } } } match ret { 0 => Err(util::eof()), n if n < 0 => Err(os::last_error()), n => Ok(n as uint) } } pub fn write(fd: sock_t, deadline: u64, buf: &[u8], write_everything: bool, lock: || -> T, write: |bool, *const u8, uint| -> i64) -> IoResult { let mut ret = -1; let mut written = 0; if deadline == 0 { if write_everything { ret = keep_going(buf, |inner, len| { written = buf.len() - len; write(false, inner, len) }); } else { ret = retry(|| { write(false, buf.as_ptr(), buf.len()) }); if ret > 0 { written = ret as uint; } } } if deadline != 0 || (ret == -1 && util::wouldblock()) { let deadline = match deadline { 0 => None, n => Some(n), }; while written < buf.len() && (write_everything || written == 0) { // As with read(), first wait for the socket to be ready for // the I/O operation. match util::await([fd], deadline, util::Writable) { Err(ref e) if e.code == libc::EOF as uint && written > 0 => { assert!(deadline.is_some()); return Err(util::short_write(written, "short write")) } Err(e) => return Err(e), Ok(()) => {} } // Also as with read(), we use MSG_DONTWAIT to guard ourselves // against unforeseen circumstances. let _guard = lock(); let ptr = buf.slice_from(written).as_ptr(); let len = buf.len() - written; match retry(|| write(deadline.is_some(), ptr, len)) { -1 if util::wouldblock() => {} -1 => return Err(os::last_error()), n => { written += n as uint; } } } ret = 0; } if ret < 0 { Err(os::last_error()) } else { Ok(written) } } #[cfg(windows)] mod os { use libc; use std::mem; use std::rt::rtio::{IoError, IoResult}; use io::c; pub type sock_t = libc::SOCKET; pub struct Event(c::WSAEVENT); impl Event { pub fn new() -> IoResult { let event = unsafe { c::WSACreateEvent() }; if event == c::WSA_INVALID_EVENT { Err(last_error()) } else { Ok(Event(event)) } } pub fn handle(&self) -> c::WSAEVENT { let Event(handle) = *self; handle } } impl Drop for Event { fn drop(&mut self) { unsafe { let _ = c::WSACloseEvent(self.handle()); } } } pub fn init() { unsafe { use std::rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; static mut INITIALIZED: bool = false; static mut LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT; let _guard = LOCK.lock(); if !INITIALIZED { let mut data: c::WSADATA = mem::zeroed(); let ret = c::WSAStartup(0x202, // version 2.2 &mut data); assert_eq!(ret, 0); INITIALIZED = true; } } } pub fn last_error() -> IoError { use std::os; let code = unsafe { c::WSAGetLastError() as uint }; IoError { code: code, extra: 0, detail: Some(os::error_string(code)), } } pub unsafe fn close(sock: sock_t) { let _ = libc::closesocket(sock); } } #[cfg(unix)] mod os { use libc; use std::rt::rtio::IoError; use io; pub type sock_t = io::file::fd_t; pub fn init() {} pub fn last_error() -> IoError { io::last_error() } pub unsafe fn close(sock: sock_t) { let _ = libc::close(sock); } }