From 93ca5ebccb4ff6761fc61b31f7a9e1e6ffc866df Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Fri, 26 Apr 2013 18:59:59 -0700 Subject: [PATCH] core::rt: Clean up the interface to rtio Make names that better match rt::io. Return error types. --- src/libcore/rt/io/mod.rs | 1 + src/libcore/rt/io/net/tcp.rs | 23 ++++++----- src/libcore/rt/rtio.rs | 18 ++++---- src/libcore/rt/uv/mod.rs | 2 +- src/libcore/rt/uvio.rs | 79 ++++++++++++++++++------------------ 5 files changed, 63 insertions(+), 60 deletions(-) diff --git a/src/libcore/rt/io/mod.rs b/src/libcore/rt/io/mod.rs index 93daa36dd60..8f56005d0a4 100644 --- a/src/libcore/rt/io/mod.rs +++ b/src/libcore/rt/io/mod.rs @@ -238,6 +238,7 @@ Out of scope * How does I/O relate to the Iterator trait? * std::base64 filters * Using conditions is a big unknown since we don't have much experience with them +* Too many uses of OtherIoError */ diff --git a/src/libcore/rt/io/net/tcp.rs b/src/libcore/rt/io/net/tcp.rs index 2ac2ffb60a8..95f43b259ce 100644 --- a/src/libcore/rt/io/net/tcp.rs +++ b/src/libcore/rt/io/net/tcp.rs @@ -9,21 +9,22 @@ // except according to those terms. use option::{Option, Some, None}; -use result::{Result, Ok, Err}; +use result::{Ok, Err}; use ops::Drop; use rt::sched::local_sched::unsafe_borrow_io; use rt::io::net::ip::IpAddr; use rt::io::{Reader, Writer, Listener}; use rt::io::io_error; -use rt::rtio; -use rt::rtio::{IoFactory, TcpListener, Stream}; +use rt::rtio::{IoFactory, + RtioTcpListener, RtioTcpListenerObject, + RtioTcpStream, RtioTcpStreamObject}; pub struct TcpStream { - rtstream: ~rtio::StreamObject + rtstream: ~RtioTcpStreamObject } impl TcpStream { - fn new(s: ~rtio::StreamObject) -> TcpStream { + fn new(s: ~RtioTcpStreamObject) -> TcpStream { TcpStream { rtstream: s } @@ -34,7 +35,7 @@ impl TcpStream { rtdebug!("borrowing io to connect"); let io = unsafe_borrow_io(); rtdebug!("about to connect"); - io.connect(addr) + io.tcp_connect(addr) }; match stream { @@ -85,12 +86,12 @@ impl Drop for TcpStream { } pub struct TcpListener { - rtlistener: ~rtio::TcpListenerObject + rtlistener: ~RtioTcpListenerObject } impl TcpListener { pub fn bind(addr: IpAddr) -> Option { - let listener = unsafe { unsafe_borrow_io().bind(addr) }; + let listener = unsafe { unsafe_borrow_io().tcp_bind(addr) }; match listener { Ok(l) => { Some(TcpListener { @@ -107,12 +108,12 @@ impl TcpListener { impl Listener for TcpListener { fn accept(&mut self) -> Option { - let rtstream = self.rtlistener.listen(); + let rtstream = self.rtlistener.accept(); match rtstream { - Some(s) => { + Ok(s) => { Some(TcpStream::new(s)) } - None => { + Err(_) => { abort!("TODO"); } } diff --git a/src/libcore/rt/rtio.rs b/src/libcore/rt/rtio.rs index 961a032607e..1d8604bc3fd 100644 --- a/src/libcore/rt/rtio.rs +++ b/src/libcore/rt/rtio.rs @@ -18,8 +18,8 @@ use super::io::net::ip::IpAddr; // types to use instead pub type EventLoopObject = super::uvio::UvEventLoop; pub type IoFactoryObject = super::uvio::UvIoFactory; -pub type StreamObject = super::uvio::UvStream; -pub type TcpListenerObject = super::uvio::UvTcpListener; +pub type RtioTcpStreamObject = super::uvio::UvTcpStream; +pub type RtioTcpListenerObject = super::uvio::UvTcpListener; pub trait EventLoop { fn run(&mut self); @@ -29,15 +29,15 @@ pub trait EventLoop { } pub trait IoFactory { - fn connect(&mut self, addr: IpAddr) -> Result<~StreamObject, IoError>; - fn bind(&mut self, addr: IpAddr) -> Result<~TcpListenerObject, IoError>; + fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>; + fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>; } -pub trait TcpListener { - fn listen(&mut self) -> Option<~StreamObject>; +pub trait RtioTcpListener { + fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>; } -pub trait Stream { - fn read(&mut self, buf: &mut [u8]) -> Result; - fn write(&mut self, buf: &[u8]) -> Result<(), ()>; +pub trait RtioTcpStream { + fn read(&mut self, buf: &mut [u8]) -> Result; + fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; } diff --git a/src/libcore/rt/uv/mod.rs b/src/libcore/rt/uv/mod.rs index 87aa7524ed6..24b6c353cce 100644 --- a/src/libcore/rt/uv/mod.rs +++ b/src/libcore/rt/uv/mod.rs @@ -48,7 +48,7 @@ use ptr::null; use unstable::finally::Finally; use rt::uvll; -use rt::io::{IoError, FileNotFound}; +use rt::io::IoError; #[cfg(test)] use unstable::run_in_bare_thread; diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uvio.rs index 2c4ff37e4be..70f233a29d3 100644 --- a/src/libcore/rt/uvio.rs +++ b/src/libcore/rt/uvio.rs @@ -10,19 +10,20 @@ use option::*; use result::*; - -use rt::io::IoError; -use super::io::net::ip::IpAddr; -use super::uv::*; -use super::rtio::*; use ops::Drop; use cell::{Cell, empty_cell}; use cast::transmute; -use super::sched::{Scheduler, local_sched}; + +use rt::io::IoError; +use rt::io::net::ip::IpAddr; +use rt::uv::*; +use rt::rtio::*; +use rt::sched::{Scheduler, local_sched}; +use rt::io::{standard_error, OtherIoError}; #[cfg(test)] use uint; #[cfg(test)] use unstable::run_in_bare_thread; -#[cfg(test)] use super::test::*; +#[cfg(test)] use rt::test::*; pub struct UvEventLoop { uvio: UvIoFactory @@ -99,11 +100,11 @@ impl IoFactory for UvIoFactory { // Connect to an address and return a new stream // NB: This blocks the task waiting on the connection. // It would probably be better to return a future - fn connect(&mut self, addr: IpAddr) -> Result<~StreamObject, IoError> { + fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError> { // Create a cell in the task to hold the result. We will fill // the cell before resuming the task. let result_cell = empty_cell(); - let result_cell_ptr: *Cell> = &result_cell; + let result_cell_ptr: *Cell> = &result_cell; let scheduler = local_sched::take(); assert!(scheduler.in_task_context()); @@ -123,7 +124,7 @@ impl IoFactory for UvIoFactory { rtdebug!("connect: in connect callback"); let maybe_stream = if status.is_none() { rtdebug!("status is none"); - Ok(~UvStream(stream_watcher)) + Ok(~UvTcpStream(stream_watcher)) } else { rtdebug!("status is some"); // XXX: Wait for close @@ -144,7 +145,7 @@ impl IoFactory for UvIoFactory { return result_cell.take(); } - fn bind(&mut self, addr: IpAddr) -> Result<~TcpListenerObject, IoError> { + fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError> { let mut watcher = TcpWatcher::new(self.uv_loop()); match watcher.bind(addr) { Ok(_) => Ok(~UvTcpListener(watcher)), @@ -177,12 +178,12 @@ impl Drop for UvTcpListener { } } -impl TcpListener for UvTcpListener { +impl RtioTcpListener for UvTcpListener { - fn listen(&mut self) -> Option<~StreamObject> { + fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> { rtdebug!("entering listen"); let result_cell = empty_cell(); - let result_cell_ptr: *Cell> = &result_cell; + let result_cell_ptr: *Cell> = &result_cell; let server_tcp_watcher = self.watcher(); @@ -199,9 +200,9 @@ impl TcpListener for UvTcpListener { let client_tcp_watcher = TcpWatcher::new(&mut loop_).as_stream(); // XXX: Needs to be surfaced in interface server_stream_watcher.accept(client_tcp_watcher); - Some(~UvStream::new(client_tcp_watcher)) + Ok(~UvTcpStream::new(client_tcp_watcher)) } else { - None + Err(standard_error(OtherIoError)) }; unsafe { (*result_cell_ptr).put_back(maybe_stream); } @@ -218,15 +219,15 @@ impl TcpListener for UvTcpListener { } } -pub struct UvStream(StreamWatcher); +pub struct UvTcpStream(StreamWatcher); -impl UvStream { - fn new(watcher: StreamWatcher) -> UvStream { - UvStream(watcher) +impl UvTcpStream { + fn new(watcher: StreamWatcher) -> UvTcpStream { + UvTcpStream(watcher) } fn watcher(&self) -> StreamWatcher { - match self { &UvStream(w) => w } + match self { &UvTcpStream(w) => w } } // XXX: finalize isn't working for ~UvStream??? @@ -236,17 +237,17 @@ impl UvStream { } } -impl Drop for UvStream { +impl Drop for UvTcpStream { fn finalize(&self) { rtdebug!("closing stream"); //self.watcher().close(||()); } } -impl Stream for UvStream { - fn read(&mut self, buf: &mut [u8]) -> Result { +impl RtioTcpStream for UvTcpStream { + fn read(&mut self, buf: &mut [u8]) -> Result { let result_cell = empty_cell(); - let result_cell_ptr: *Cell> = &result_cell; + let result_cell_ptr: *Cell> = &result_cell; let scheduler = local_sched::take(); assert!(scheduler.in_task_context()); @@ -277,7 +278,7 @@ impl Stream for UvStream { assert!(nread >= 0); Ok(nread as uint) } else { - Err(()) + Err(standard_error(OtherIoError)) }; unsafe { (*result_cell_ptr).put_back(result); } @@ -291,9 +292,9 @@ impl Stream for UvStream { return result_cell.take(); } - fn write(&mut self, buf: &[u8]) -> Result<(), ()> { + fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { let result_cell = empty_cell(); - let result_cell_ptr: *Cell> = &result_cell; + let result_cell_ptr: *Cell> = &result_cell; let scheduler = local_sched::take(); assert!(scheduler.in_task_context()); let watcher = self.watcher(); @@ -308,7 +309,7 @@ impl Stream for UvStream { let result = if status.is_none() { Ok(()) } else { - Err(()) + Err(standard_error(OtherIoError)) }; unsafe { (*result_cell_ptr).put_back(result); } @@ -328,7 +329,7 @@ fn test_simple_io_no_connect() { do run_in_newsched_task { let io = unsafe { local_sched::unsafe_borrow_io() }; let addr = next_test_ip4(); - let maybe_chan = io.connect(addr); + let maybe_chan = io.tcp_connect(addr); assert!(maybe_chan.is_err()); } } @@ -342,8 +343,8 @@ fn test_simple_tcp_server_and_client() { do spawntask_immediately { unsafe { let io = local_sched::unsafe_borrow_io(); - let mut listener = io.bind(addr).unwrap(); - let mut stream = listener.listen().unwrap(); + let mut listener = io.tcp_bind(addr).unwrap(); + let mut stream = listener.accept().unwrap(); let mut buf = [0, .. 2048]; let nread = stream.read(buf).unwrap(); assert!(nread == 8); @@ -359,7 +360,7 @@ fn test_simple_tcp_server_and_client() { do spawntask_immediately { unsafe { let io = local_sched::unsafe_borrow_io(); - let mut stream = io.connect(addr).unwrap(); + let mut stream = io.tcp_connect(addr).unwrap(); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); stream.close(); } @@ -374,8 +375,8 @@ fn test_read_and_block() { do spawntask_immediately { let io = unsafe { local_sched::unsafe_borrow_io() }; - let mut listener = io.bind(addr).unwrap(); - let mut stream = listener.listen().unwrap(); + let mut listener = io.tcp_bind(addr).unwrap(); + let mut stream = listener.accept().unwrap(); let mut buf = [0, .. 2048]; let expected = 32; @@ -412,7 +413,7 @@ fn test_read_and_block() { do spawntask_immediately { let io = unsafe { local_sched::unsafe_borrow_io() }; - let mut stream = io.connect(addr).unwrap(); + let mut stream = io.tcp_connect(addr).unwrap(); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); @@ -432,8 +433,8 @@ fn test_read_read_read() { do spawntask_immediately { unsafe { let io = local_sched::unsafe_borrow_io(); - let mut listener = io.bind(addr).unwrap(); - let mut stream = listener.listen().unwrap(); + let mut listener = io.tcp_bind(addr).unwrap(); + let mut stream = listener.accept().unwrap(); let mut buf = [1, .. 2048]; let mut total_bytes_written = 0; while total_bytes_written < MAX { @@ -447,7 +448,7 @@ fn test_read_read_read() { do spawntask_immediately { let io = unsafe { local_sched::unsafe_borrow_io() }; - let mut stream = io.connect(addr).unwrap(); + let mut stream = io.tcp_connect(addr).unwrap(); let mut buf = [0, .. 2048]; let mut total_bytes_read = 0; while total_bytes_read < MAX {