core::rt: Begin implementing TcpStream

This ended up touching a lot of code related to error handling.
This commit is contained in:
Brian Anderson 2013-04-24 20:20:03 -07:00
parent 0b4d4edf8b
commit b2fbd34603
11 changed files with 282 additions and 57 deletions

View File

@ -30,6 +30,14 @@ macro_rules! rtdebug (
($( $arg:expr),+) => ( $(let _ = $arg)*; )
)
macro_rules! rtassert (
( $arg:expr ) => ( {
if !$arg {
abort!("assertion failed: %s", stringify!($arg));
}
} )
)
macro_rules! abort(
($( $msg:expr),+) => ( {
rtdebug!($($msg),+);

View File

@ -252,7 +252,9 @@ pub use self::stdio::println;
pub use self::file::FileStream;
pub use self::net::ip::IpAddr;
#[cfg(not(stage0))]
pub use self::net::tcp::TcpListener;
#[cfg(not(stage0))]
pub use self::net::tcp::TcpStream;
pub use self::net::udp::UdpStream;
@ -266,6 +268,7 @@ pub mod file;
/// Synchronous, non-blocking network I/O.
pub mod net {
#[cfg(not(stage0))]
pub mod tcp;
pub mod udp;
pub mod ip;
@ -326,12 +329,14 @@ pub struct IoError {
#[deriving(Eq)]
pub enum IoErrorKind {
PreviousIoError,
OtherIoError,
EndOfFile,
FileNotFound,
FilePermission,
PermissionDenied,
ConnectionFailed,
Closed,
OtherIoError,
PreviousIoError
ConnectionRefused,
}
// XXX: Can't put doc comments on macros

View File

@ -8,63 +8,179 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::*;
use super::super::*;
use super::ip::IpAddr;
use option::{Option, Some, None};
use result::{Result, Ok, Err};
use ops::Drop;
use rt::sched::local_sched::unsafe_borrow_io;
use rt::io::net::ip::IpAddr;
use rt::io::{Reader, Writer, Listener};
use rt::io::io_error;
use rt::rtio;
use rt::rtio::{IoFactory, TcpListener, Stream};
pub struct TcpStream;
pub struct TcpStream {
rtstream: ~rtio::StreamObject
}
impl TcpStream {
pub fn connect(_addr: IpAddr) -> Option<TcpStream> {
fail!()
fn new(s: ~rtio::StreamObject) -> TcpStream {
TcpStream {
rtstream: s
}
}
pub fn connect(addr: IpAddr) -> Option<TcpStream> {
let stream = unsafe {
rtdebug!("borrowing io to connect");
let io = unsafe_borrow_io();
rtdebug!("about to connect");
io.connect(addr)
};
match stream {
Ok(s) => {
Some(TcpStream::new(s))
}
Err(ioerr) => {
rtdebug!("failed to connect: %?", ioerr);
io_error::cond.raise(ioerr);
return None;
}
}
}
}
impl Reader for TcpStream {
fn read(&mut self, _buf: &mut [u8]) -> Option<uint> { fail!() }
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
let bytes_read = self.rtstream.read(buf);
match bytes_read {
Ok(read) => Some(read),
Err(_) => {
abort!("TODO");
}
}
}
fn eof(&mut self) -> bool { fail!() }
}
impl Writer for TcpStream {
fn write(&mut self, _buf: &[u8]) { fail!() }
fn write(&mut self, buf: &[u8]) {
let res = self.rtstream.write(buf);
match res {
Ok(_) => (),
Err(_) => {
abort!("TODO");
}
}
}
fn flush(&mut self) { fail!() }
}
pub struct TcpListener;
impl Drop for TcpStream {
fn finalize(&self) {
self.rtstream.close();
}
}
pub struct TcpListener {
rtlistener: ~rtio::TcpListenerObject
}
impl TcpListener {
pub fn bind(_addr: IpAddr) -> Option<TcpListener> {
fail!()
pub fn bind(addr: IpAddr) -> Option<TcpListener> {
let listener = unsafe { unsafe_borrow_io().bind(addr) };
match listener {
Ok(l) => {
Some(TcpListener {
rtlistener: l
})
}
Err(ioerr) => {
io_error::cond.raise(ioerr);
return None;
}
}
}
}
impl Listener<TcpStream> for TcpListener {
fn accept(&mut self) -> Option<TcpStream> { fail!() }
fn accept(&mut self) -> Option<TcpStream> {
let rtstream = self.rtlistener.listen();
match rtstream {
Some(s) => {
Some(TcpStream::new(s))
}
None => {
abort!("TODO");
}
}
}
}
impl Drop for TcpListener {
fn finalize(&self) {
self.rtlistener.close();
}
}
#[cfg(test)]
mod test {
use super::*;
use rt::test::*;
use rt::io::net::ip::Ipv4;
use rt::io::*;
#[test] #[ignore]
#[test]
fn bind_error() {
do run_in_newsched_task {
let mut called = false;
do io_error::cond.trap(|e| {
assert!(e.kind == PermissionDenied);
called = true;
}).in {
let addr = Ipv4(0, 0, 0, 0, 1);
let listener = TcpListener::bind(addr);
assert!(listener.is_none());
}
assert!(called);
}
}
#[test]
fn connect_error() {
do run_in_newsched_task {
let mut called = false;
do io_error::cond.trap(|e| {
assert!(e.kind == ConnectionRefused);
called = true;
}).in {
let addr = Ipv4(0, 0, 0, 0, 1);
let stream = TcpStream::connect(addr);
assert!(stream.is_none());
}
assert!(called);
}
}
#[test]
fn smoke_test() {
/*do run_in_newsched_task {
do run_in_newsched_task {
let addr = next_test_ip4();
do spawn_immediately {
let listener = TcpListener::bind(addr);
do listener.accept() {
let mut buf = [0];
listener.read(buf);
assert!(buf[0] == 99);
}
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut buf = [0];
stream.read(buf);
assert!(buf[0] == 99);
}
do spawn_immediately {
let stream = TcpStream::connect(addr);
do spawntask_immediately {
let mut stream = TcpStream::connect(addr);
stream.write([99]);
}
}*/
}
}
}

View File

@ -177,7 +177,8 @@ pub unsafe fn unsafe_borrow_local_services() -> &mut LocalServices {
transmute_mut_region(&mut task.local_services)
}
None => {
fail!(~"no local services for schedulers yet")
// Don't fail. Infinite recursion
abort!("no local services for schedulers yet")
}
}
}

View File

@ -8,7 +8,15 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
/*! The Rust runtime, including the scheduler and I/O interface */
/*! The Rust runtime, including the scheduler and I/O interface
# XXX
* Unsafe uses of borrowed pointers should just use unsafe pointers
* Unwinding is not wired up correctly
*/
#[doc(hidden)];
@ -16,7 +24,7 @@ use libc::c_char;
#[path = "sched/mod.rs"]
mod sched;
mod rtio;
pub mod rtio;
pub mod uvll;
mod uvio;
#[path = "uv/mod.rs"]

View File

@ -11,6 +11,7 @@
use option::*;
use result::*;
use rt::io::IoError;
use super::io::net::ip::IpAddr;
// XXX: ~object doesn't work currently so these are some placeholder
@ -28,8 +29,8 @@ pub trait EventLoop {
}
pub trait IoFactory {
fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject>;
fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject>;
fn connect(&mut self, addr: IpAddr) -> Result<~StreamObject, IoError>;
fn bind(&mut self, addr: IpAddr) -> Result<~TcpListenerObject, IoError>;
}
pub trait TcpListener {

View File

@ -13,18 +13,21 @@
use prelude::*;
use ptr::mut_null;
use libc::c_void;
use cast::transmute;
use cast;
use cell::Cell;
use super::Scheduler;
use super::super::rtio::IoFactoryObject;
use tls = super::super::thread_local_storage;
use unstable::finally::Finally;
#[cfg(test)] use super::super::uvio::UvEventLoop;
/// Give the Scheduler to thread-local storage
pub fn put(sched: ~Scheduler) {
unsafe {
let key = tls_key();
let void_sched: *mut c_void = transmute::<~Scheduler, *mut c_void>(sched);
let void_sched: *mut c_void = cast::transmute(sched);
tls::set(key, void_sched);
}
}
@ -34,8 +37,8 @@ pub fn take() -> ~Scheduler {
unsafe {
let key = tls_key();
let void_sched: *mut c_void = tls::get(key);
assert!(void_sched.is_not_null());
let sched = transmute::<*mut c_void, ~Scheduler>(void_sched);
rtassert!(void_sched.is_not_null());
let sched: ~Scheduler = cast::transmute(void_sched);
tls::set(key, mut_null());
return sched;
}
@ -55,8 +58,18 @@ pub fn exists() -> bool {
/// While the scheduler is borrowed it is not available in TLS.
pub fn borrow(f: &fn(&mut Scheduler)) {
let mut sched = take();
f(sched);
put(sched);
// XXX: Need a different abstraction from 'finally' here to avoid unsafety
unsafe {
let unsafe_sched = cast::transmute_mut_region(&mut *sched);
let sched = Cell(sched);
do (|| {
f(unsafe_sched);
}).finally {
put(sched.take());
}
}
}
/// Borrow a mutable reference to the thread-local Scheduler
@ -68,11 +81,11 @@ pub fn borrow(f: &fn(&mut Scheduler)) {
pub unsafe fn unsafe_borrow() -> &mut Scheduler {
let key = tls_key();
let mut void_sched: *mut c_void = tls::get(key);
assert!(void_sched.is_not_null());
rtassert!(void_sched.is_not_null());
{
let void_sched_ptr = &mut void_sched;
let sched: &mut ~Scheduler = {
transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr)
cast::transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr)
};
let sched: &mut Scheduler = &mut **sched;
return sched;
@ -91,7 +104,7 @@ fn tls_key() -> tls::Key {
fn maybe_tls_key() -> Option<tls::Key> {
unsafe {
let key: *mut c_void = rust_get_sched_tls_key();
let key: &mut tls::Key = transmute(key);
let key: &mut tls::Key = cast::transmute(key);
let key = *key;
// Check that the key has been initialized.

View File

@ -34,17 +34,22 @@ via `close` and `delete` methods.
*/
use libc;
use vec;
use ptr;
use cast;
use str;
use option::*;
use str::raw::from_c_str;
use to_str::ToStr;
use vec;
use ptr;
use libc::{c_void, c_int, size_t, malloc, free};
use cast::transmute;
use ptr::null;
use super::uvll;
use unstable::finally::Finally;
use rt::uvll;
use rt::io::{IoError, FileNotFound};
#[cfg(test)] use unstable::run_in_bare_thread;
pub use self::file::{FsRequest, FsCallback};
@ -211,6 +216,55 @@ fn error_smoke_test() {
assert!(err.to_str() == ~"EOF: end of file");
}
pub fn last_uv_error<H, W: Watcher + NativeHandle<*H>>(watcher: &W) -> UvError {
unsafe {
let loop_ = loop_from_watcher(watcher);
UvError(uvll::last_error(loop_.native_handle()))
}
}
pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
// XXX: Could go in str::raw
unsafe fn c_str_to_static_slice(s: *libc::c_char) -> &'static str {
let s = s as *u8;
let mut curr = s, len = 0u;
while *curr != 0u8 {
len += 1u;
curr = ptr::offset(s, len);
}
str::raw::buf_as_slice(s, len, |d| cast::transmute(d))
}
unsafe {
// Importing error constants
use rt::uvll::*;
use rt::io::*;
// uv error descriptions are static
let c_desc = uvll::strerror(&*uverr);
let desc = c_str_to_static_slice(c_desc);
let kind = match uverr.code {
UNKNOWN => OtherIoError,
OK => OtherIoError,
EOF => EndOfFile,
EACCES => PermissionDenied,
ECONNREFUSED => ConnectionRefused,
e => {
abort!("unknown uv error code: %u", e as uint);
}
};
IoError {
kind: kind,
desc: desc,
detail: None
}
}
}
/// Given a uv handle, convert a callback status to a UvError
// XXX: Follow the pattern below by parameterizing over T: Watcher, not T

View File

@ -18,13 +18,14 @@ use super::{Loop, Watcher, Request, UvError, Buf, Callback, NativeHandle, NullCa
install_watcher_data, get_watcher_data, drop_watcher_data,
vec_to_uv_buf, vec_from_uv_buf};
use super::super::io::net::ip::{IpAddr, Ipv4, Ipv6};
use rt::uv::last_uv_error;
#[cfg(test)] use cell::Cell;
#[cfg(test)] use unstable::run_in_bare_thread;
#[cfg(test)] use super::super::thread::Thread;
#[cfg(test)] use super::super::test::*;
fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) {
fn ip4_as_uv_ip4<T>(addr: IpAddr, f: &fn(*sockaddr_in) -> T) -> T {
match addr {
Ipv4(a, b, c, d, p) => {
unsafe {
@ -34,7 +35,7 @@ fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) {
c as uint,
d as uint), p as int);
do (|| {
f(addr);
f(addr)
}).finally {
free_ip4_addr(addr);
}
@ -193,15 +194,18 @@ pub impl TcpWatcher {
}
}
fn bind(&mut self, address: IpAddr) {
fn bind(&mut self, address: IpAddr) -> Result<(), UvError> {
match address {
Ipv4(*) => {
do ip4_as_uv_ip4(address) |addr| {
let result = unsafe {
uvll::tcp_bind(self.native_handle(), addr)
};
// XXX: bind is likely to fail. need real error handling
assert!(result == 0);
if result == 0 {
Ok(())
} else {
Err(last_uv_error(self))
}
}
}
_ => fail!()

View File

@ -11,6 +11,7 @@
use option::*;
use result::*;
use rt::io::IoError;
use super::io::net::ip::IpAddr;
use super::uv::*;
use super::rtio::*;
@ -98,11 +99,11 @@ impl IoFactory for UvIoFactory {
// Connect to an address and return a new stream
// NB: This blocks the task waiting on the connection.
// It would probably be better to return a future
fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject> {
fn connect(&mut self, addr: IpAddr) -> Result<~StreamObject, IoError> {
// Create a cell in the task to hold the result. We will fill
// the cell before resuming the task.
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
let result_cell_ptr: *Cell<Result<~StreamObject, IoError>> = &result_cell;
let scheduler = local_sched::take();
assert!(scheduler.in_task_context());
@ -122,11 +123,12 @@ impl IoFactory for UvIoFactory {
rtdebug!("connect: in connect callback");
let maybe_stream = if status.is_none() {
rtdebug!("status is none");
Some(~UvStream(stream_watcher))
Ok(~UvStream(stream_watcher))
} else {
rtdebug!("status is some");
// XXX: Wait for close
stream_watcher.close(||());
None
Err(uv_error_to_io_error(status.get()))
};
// Store the stream in the task's stack
@ -142,10 +144,16 @@ impl IoFactory for UvIoFactory {
return result_cell.take();
}
fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject> {
fn bind(&mut self, addr: IpAddr) -> Result<~TcpListenerObject, IoError> {
let mut watcher = TcpWatcher::new(self.uv_loop());
watcher.bind(addr);
return Some(~UvTcpListener(watcher));
match watcher.bind(addr) {
Ok(_) => Ok(~UvTcpListener(watcher)),
Err(uverr) => {
// XXX: Should we wait until close completes?
watcher.as_stream().close(||());
Err(uv_error_to_io_error(uverr))
}
}
}
}
@ -321,7 +329,7 @@ fn test_simple_io_no_connect() {
let io = unsafe { local_sched::unsafe_borrow_io() };
let addr = next_test_ip4();
let maybe_chan = io.connect(addr);
assert!(maybe_chan.is_none());
assert!(maybe_chan.is_err());
}
}

View File

@ -33,6 +33,13 @@ use libc::{size_t, c_int, c_uint, c_void, c_char, uintptr_t};
use libc::{malloc, free};
use prelude::*;
pub static UNKNOWN: c_int = -1;
pub static OK: c_int = 0;
pub static EOF: c_int = 1;
pub static EADDRINFO: c_int = 2;
pub static EACCES: c_int = 3;
pub static ECONNREFUSED: c_int = 12;
pub struct uv_err_t {
code: c_int,
sys_errno_: c_int