// Copyright 2013 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // // Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. use option::{Option, Some, None}; use result::{Ok, Err}; use rt::io::net::ip::SocketAddr; use rt::io::{Reader, Writer, Listener}; use rt::io::{io_error, read_error, EndOfFile}; use rt::rtio::{IoFactory, IoFactoryObject, RtioSocket, RtioTcpListener, RtioTcpListenerObject, RtioTcpStream, RtioTcpStreamObject, RtioStream}; use rt::local::Local; pub struct TcpStream(~RtioTcpStreamObject); impl TcpStream { fn new(s: ~RtioTcpStreamObject) -> TcpStream { TcpStream(s) } pub fn connect(addr: SocketAddr) -> Option { let stream = unsafe { rtdebug!("borrowing io to connect"); let io: *mut IoFactoryObject = Local::unsafe_borrow(); rtdebug!("about to connect"); (*io).tcp_connect(addr) }; match stream { Ok(s) => Some(TcpStream::new(s)), Err(ioerr) => { rtdebug!("failed to connect: %?", ioerr); io_error::cond.raise(ioerr); None } } } pub fn peer_name(&mut self) -> Option { match (**self).peer_name() { Ok(pn) => Some(pn), Err(ioerr) => { rtdebug!("failed to get peer name: %?", ioerr); io_error::cond.raise(ioerr); None } } } pub fn socket_name(&mut self) -> Option { match (**self).socket_name() { Ok(sn) => Some(sn), Err(ioerr) => { rtdebug!("failed to get socket name: %?", ioerr); io_error::cond.raise(ioerr); None } } } } impl Reader for TcpStream { fn read(&mut self, buf: &mut [u8]) -> Option { match (***self).read(buf) { Ok(read) => Some(read), Err(ioerr) => { // EOF is indicated by returning None if ioerr.kind != EndOfFile { read_error::cond.raise(ioerr); } return None; } } } fn eof(&mut self) -> bool { fail!() } } impl Writer for TcpStream { fn write(&mut self, buf: &[u8]) { match (***self).write(buf) { Ok(_) => (), Err(ioerr) => io_error::cond.raise(ioerr), } } fn flush(&mut self) { fail!() } } pub struct TcpListener(~RtioTcpListenerObject); impl TcpListener { pub fn bind(addr: SocketAddr) -> Option { let listener = unsafe { let io: *mut IoFactoryObject = Local::unsafe_borrow(); (*io).tcp_bind(addr) }; match listener { Ok(l) => Some(TcpListener(l)), Err(ioerr) => { io_error::cond.raise(ioerr); return None; } } } pub fn socket_name(&mut self) -> Option { match (**self).socket_name() { Ok(sn) => Some(sn), Err(ioerr) => { rtdebug!("failed to get socket name: %?", ioerr); io_error::cond.raise(ioerr); None } } } } impl Listener for TcpListener { fn accept(&mut self) -> Option { match (**self).accept() { Ok(s) => Some(TcpStream::new(s)), Err(ioerr) => { io_error::cond.raise(ioerr); return None; } } } } #[cfg(test)] mod test { use super::*; use cell::Cell; use rt::test::*; use rt::io::net::ip::{Ipv4Addr, SocketAddr}; use rt::io::*; use prelude::*; #[test] #[ignore] fn bind_error() { do run_in_newsched_task { let mut called = false; do io_error::cond.trap(|e| { assert!(e.kind == PermissionDenied); called = true; }).inside { let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 }; let listener = TcpListener::bind(addr); assert!(listener.is_none()); } assert!(called); } } #[test] fn connect_error() { do run_in_newsched_task { let mut called = false; do io_error::cond.trap(|e| { assert_eq!(e.kind, ConnectionRefused); called = true; }).inside { let addr = SocketAddr { ip: Ipv4Addr(0, 0, 0, 0), port: 1 }; let stream = TcpStream::connect(addr); assert!(stream.is_none()); } assert!(called); } } #[test] fn smoke_test_ip4() { do run_in_newsched_task { let addr = next_test_ip4(); do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; stream.read(buf); assert!(buf[0] == 99); } do spawntask { let mut stream = TcpStream::connect(addr); stream.write([99]); } } } #[test] fn smoke_test_ip6() { do run_in_newsched_task { let addr = next_test_ip6(); do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; stream.read(buf); assert!(buf[0] == 99); } do spawntask { let mut stream = TcpStream::connect(addr); stream.write([99]); } } } #[test] fn read_eof_ip4() { do run_in_newsched_task { let addr = next_test_ip4(); do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; let nread = stream.read(buf); assert!(nread.is_none()); } do spawntask { let _stream = TcpStream::connect(addr); // Close } } } #[test] fn read_eof_ip6() { do run_in_newsched_task { let addr = next_test_ip6(); do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; let nread = stream.read(buf); assert!(nread.is_none()); } do spawntask { let _stream = TcpStream::connect(addr); // Close } } } #[test] fn read_eof_twice_ip4() { do run_in_newsched_task { let addr = next_test_ip4(); do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; let nread = stream.read(buf); assert!(nread.is_none()); let nread = stream.read(buf); assert!(nread.is_none()); } do spawntask { let _stream = TcpStream::connect(addr); // Close } } } #[test] fn read_eof_twice_ip6() { do run_in_newsched_task { let addr = next_test_ip6(); do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let mut buf = [0]; let nread = stream.read(buf); assert!(nread.is_none()); let nread = stream.read(buf); assert!(nread.is_none()); } do spawntask { let _stream = TcpStream::connect(addr); // Close } } } #[test] fn write_close_ip4() { do run_in_newsched_task { let addr = next_test_ip4(); do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let buf = [0]; loop { let mut stop = false; do io_error::cond.trap(|e| { // NB: ECONNRESET on linux, EPIPE on mac assert!(e.kind == ConnectionReset || e.kind == BrokenPipe); stop = true; }).inside { stream.write(buf); } if stop { break } } } do spawntask { let _stream = TcpStream::connect(addr); // Close } } } #[test] fn write_close_ip6() { do run_in_newsched_task { let addr = next_test_ip6(); do spawntask { let mut listener = TcpListener::bind(addr); let mut stream = listener.accept(); let buf = [0]; loop { let mut stop = false; do io_error::cond.trap(|e| { // NB: ECONNRESET on linux, EPIPE on mac assert!(e.kind == ConnectionReset || e.kind == BrokenPipe); stop = true; }).inside { stream.write(buf); } if stop { break } } } do spawntask { let _stream = TcpStream::connect(addr); // Close } } } #[test] fn multiple_connect_serial_ip4() { do run_in_newsched_task { let addr = next_test_ip4(); let max = 10; do spawntask { let mut listener = TcpListener::bind(addr); do max.times { let mut stream = listener.accept(); let mut buf = [0]; stream.read(buf); assert_eq!(buf[0], 99); } } do spawntask { do max.times { let mut stream = TcpStream::connect(addr); stream.write([99]); } } } } #[test] fn multiple_connect_serial_ip6() { do run_in_newsched_task { let addr = next_test_ip6(); let max = 10; do spawntask { let mut listener = TcpListener::bind(addr); do max.times { let mut stream = listener.accept(); let mut buf = [0]; stream.read(buf); assert_eq!(buf[0], 99); } } do spawntask { do max.times { let mut stream = TcpStream::connect(addr); stream.write([99]); } } } } #[test] fn multiple_connect_interleaved_greedy_schedule_ip4() { do run_in_newsched_task { let addr = next_test_ip4(); static MAX: int = 10; do spawntask { let mut listener = TcpListener::bind(addr); for i in range(0, MAX) { let stream = Cell::new(listener.accept()); rtdebug!("accepted"); // Start another task to handle the connection do spawntask { let mut stream = stream.take(); let mut buf = [0]; stream.read(buf); assert!(buf[0] == i as u8); rtdebug!("read"); } } } connect(0, addr); fn connect(i: int, addr: SocketAddr) { if i == MAX { return } do spawntask { rtdebug!("connecting"); let mut stream = TcpStream::connect(addr); // Connect again before writing connect(i + 1, addr); rtdebug!("writing"); stream.write([i as u8]); } } } } #[test] fn multiple_connect_interleaved_greedy_schedule_ip6() { do run_in_newsched_task { let addr = next_test_ip6(); static MAX: int = 10; do spawntask { let mut listener = TcpListener::bind(addr); for i in range(0, MAX) { let stream = Cell::new(listener.accept()); rtdebug!("accepted"); // Start another task to handle the connection do spawntask { let mut stream = stream.take(); let mut buf = [0]; stream.read(buf); assert!(buf[0] == i as u8); rtdebug!("read"); } } } connect(0, addr); fn connect(i: int, addr: SocketAddr) { if i == MAX { return } do spawntask { rtdebug!("connecting"); let mut stream = TcpStream::connect(addr); // Connect again before writing connect(i + 1, addr); rtdebug!("writing"); stream.write([i as u8]); } } } } #[test] fn multiple_connect_interleaved_lazy_schedule_ip4() { do run_in_newsched_task { let addr = next_test_ip4(); static MAX: int = 10; do spawntask { let mut listener = TcpListener::bind(addr); for _ in range(0, MAX) { let stream = Cell::new(listener.accept()); rtdebug!("accepted"); // Start another task to handle the connection do spawntask_later { let mut stream = stream.take(); let mut buf = [0]; stream.read(buf); assert!(buf[0] == 99); rtdebug!("read"); } } } connect(0, addr); fn connect(i: int, addr: SocketAddr) { if i == MAX { return } do spawntask_later { rtdebug!("connecting"); let mut stream = TcpStream::connect(addr); // Connect again before writing connect(i + 1, addr); rtdebug!("writing"); stream.write([99]); } } } } #[test] fn multiple_connect_interleaved_lazy_schedule_ip6() { do run_in_newsched_task { let addr = next_test_ip6(); static MAX: int = 10; do spawntask { let mut listener = TcpListener::bind(addr); for _ in range(0, MAX) { let stream = Cell::new(listener.accept()); rtdebug!("accepted"); // Start another task to handle the connection do spawntask_later { let mut stream = stream.take(); let mut buf = [0]; stream.read(buf); assert!(buf[0] == 99); rtdebug!("read"); } } } connect(0, addr); fn connect(i: int, addr: SocketAddr) { if i == MAX { return } do spawntask_later { rtdebug!("connecting"); let mut stream = TcpStream::connect(addr); // Connect again before writing connect(i + 1, addr); rtdebug!("writing"); stream.write([99]); } } } } #[cfg(test)] fn socket_name(addr: SocketAddr) { do run_in_newsched_task { do spawntask { let listener = TcpListener::bind(addr); assert!(listener.is_some()); let mut listener = listener.unwrap(); // Make sure socket_name gives // us the socket we binded to. let so_name = listener.socket_name(); assert!(so_name.is_some()); assert_eq!(addr, so_name.unwrap()); } } } #[cfg(test)] fn peer_name(addr: SocketAddr) { do run_in_newsched_task { do spawntask { let mut listener = TcpListener::bind(addr); listener.accept(); } do spawntask { let stream = TcpStream::connect(addr); assert!(stream.is_some()); let mut stream = stream.unwrap(); // Make sure peer_name gives us the // address/port of the peer we've // connected to. let peer_name = stream.peer_name(); assert!(peer_name.is_some()); assert_eq!(addr, peer_name.unwrap()); } } } #[test] fn socket_and_peer_name_ip4() { peer_name(next_test_ip4()); socket_name(next_test_ip4()); } #[test] fn socket_and_peer_name_ip6() { // XXX: peer name is not consistent //peer_name(next_test_ip6()); socket_name(next_test_ip6()); } }