librustuv: Change with_local_io to use RAII.

This commit is contained in:
Patrick Walton 2013-12-05 17:25:48 -08:00
parent 8c2ebe1622
commit 6bd80f7450
14 changed files with 186 additions and 157 deletions

View File

@ -9,6 +9,7 @@
// except according to those terms.
use std::c_str::CString;
use std::cast;
use std::comm::SharedChan;
use std::libc::c_int;
use std::libc;
@ -161,8 +162,11 @@ impl EventLoop for UvEventLoop {
~AsyncWatcher::new(self.uvio.uv_loop(), f) as ~RemoteCallback
}
fn io<'a>(&'a mut self, f: |&'a mut IoFactory|) {
f(&mut self.uvio as &mut IoFactory)
fn io(&mut self) -> &'static mut IoFactory:'static {
unsafe {
let factory = &mut self.uvio as &mut IoFactory;
cast::transmute(factory)
}
}
}

View File

@ -51,7 +51,7 @@ use iter::Iterator;
use super::{Reader, Writer, Seek};
use super::{SeekStyle, Read, Write, Open, IoError, Truncate,
FileMode, FileAccess, FileStat, io_error, FilePermission};
use rt::rtio::{RtioFileStream, IoFactory, with_local_io};
use rt::rtio::{RtioFileStream, IoFactory, LocalIo};
use io;
use option::{Some, None, Option};
use result::{Ok, Err, Result};
@ -76,15 +76,14 @@ pub struct File {
}
fn io_raise<T>(f: |io: &mut IoFactory| -> Result<T, IoError>) -> Option<T> {
with_local_io(|io| {
match f(io) {
Ok(t) => Some(t),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
let mut io = LocalIo::borrow();
match f(io.get()) {
Ok(t) => Some(t),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
})
}
}
impl File {
@ -132,19 +131,18 @@ impl File {
pub fn open_mode(path: &Path,
mode: FileMode,
access: FileAccess) -> Option<File> {
with_local_io(|io| {
match io.fs_open(&path.to_c_str(), mode, access) {
Ok(fd) => Some(File {
path: path.clone(),
fd: fd,
last_nread: -1
}),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
let mut io = LocalIo::borrow();
match io.get().fs_open(&path.to_c_str(), mode, access) {
Ok(fd) => Some(File {
path: path.clone(),
fd: fd,
last_nread: -1
}),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
})
}
}
/// Attempts to open a file in read-only mode. This function is equivalent to

View File

@ -223,3 +223,6 @@ impl rtio::IoFactory for IoFactory {
Err(unimpl())
}
}
pub static mut NATIVE_IO_FACTORY: IoFactory = IoFactory;

View File

@ -21,7 +21,7 @@ use option::{Option, Some, None};
use result::{Ok, Err};
use io::{io_error};
use io::net::ip::{SocketAddr, IpAddr};
use rt::rtio::{IoFactory, with_local_io};
use rt::rtio::{IoFactory, LocalIo};
use vec::ImmutableVector;
/// Hints to the types of sockets that are desired when looking up hosts
@ -95,17 +95,16 @@ pub fn get_host_addresses(host: &str) -> Option<~[IpAddr]> {
///
/// XXX: this is not public because the `Hint` structure is not ready for public
/// consumption just yet.
fn lookup(hostname: Option<&str>, servname: Option<&str>,
hint: Option<Hint>) -> Option<~[Info]> {
with_local_io(|io| {
match io.get_host_addresses(hostname, servname, hint) {
Ok(i) => Some(i),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
fn lookup(hostname: Option<&str>, servname: Option<&str>, hint: Option<Hint>)
-> Option<~[Info]> {
let mut io = LocalIo::borrow();
match io.get().get_host_addresses(hostname, servname, hint) {
Ok(i) => Some(i),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
})
}
}
#[cfg(test)]

View File

@ -13,8 +13,8 @@ use result::{Ok, Err};
use io::net::ip::SocketAddr;
use io::{Reader, Writer, Listener, Acceptor};
use io::{io_error, EndOfFile};
use rt::rtio::{IoFactory, with_local_io,
RtioSocket, RtioTcpListener, RtioTcpAcceptor, RtioTcpStream};
use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener};
use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
pub struct TcpStream {
priv obj: ~RtioTcpStream
@ -26,15 +26,17 @@ impl TcpStream {
}
pub fn connect(addr: SocketAddr) -> Option<TcpStream> {
with_local_io(|io| {
match io.tcp_connect(addr) {
Ok(s) => Some(TcpStream::new(s)),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
let result = {
let mut io = LocalIo::borrow();
io.get().tcp_connect(addr)
};
match result {
Ok(s) => Some(TcpStream::new(s)),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
})
}
}
pub fn peer_name(&mut self) -> Option<SocketAddr> {
@ -92,15 +94,14 @@ pub struct TcpListener {
impl TcpListener {
pub fn bind(addr: SocketAddr) -> Option<TcpListener> {
with_local_io(|io| {
match io.tcp_bind(addr) {
Ok(l) => Some(TcpListener { obj: l }),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
let mut io = LocalIo::borrow();
match io.get().tcp_bind(addr) {
Ok(l) => Some(TcpListener { obj: l }),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
})
}
}
pub fn socket_name(&mut self) -> Option<SocketAddr> {

View File

@ -13,7 +13,7 @@ use result::{Ok, Err};
use io::net::ip::SocketAddr;
use io::{Reader, Writer};
use io::{io_error, EndOfFile};
use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory, with_local_io};
use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory, LocalIo};
pub struct UdpSocket {
priv obj: ~RtioUdpSocket
@ -21,15 +21,14 @@ pub struct UdpSocket {
impl UdpSocket {
pub fn bind(addr: SocketAddr) -> Option<UdpSocket> {
with_local_io(|io| {
match io.udp_bind(addr) {
Ok(s) => Some(UdpSocket { obj: s }),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
let mut io = LocalIo::borrow();
match io.get().udp_bind(addr) {
Ok(s) => Some(UdpSocket { obj: s }),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
})
}
}
pub fn recvfrom(&mut self, buf: &mut [u8]) -> Option<(uint, SocketAddr)> {

View File

@ -25,7 +25,7 @@ instances as clients.
use prelude::*;
use c_str::ToCStr;
use rt::rtio::{IoFactory, RtioUnixListener, with_local_io};
use rt::rtio::{IoFactory, LocalIo, RtioUnixListener};
use rt::rtio::{RtioUnixAcceptor, RtioPipe};
use io::pipe::PipeStream;
use io::{io_error, Listener, Acceptor, Reader, Writer};
@ -59,15 +59,14 @@ impl UnixStream {
/// stream.write([1, 2, 3]);
///
pub fn connect<P: ToCStr>(path: &P) -> Option<UnixStream> {
with_local_io(|io| {
match io.unix_connect(&path.to_c_str()) {
Ok(s) => Some(UnixStream::new(s)),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
let mut io = LocalIo::borrow();
match io.get().unix_connect(&path.to_c_str()) {
Ok(s) => Some(UnixStream::new(s)),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
})
}
}
}
@ -108,15 +107,14 @@ impl UnixListener {
/// }
///
pub fn bind<P: ToCStr>(path: &P) -> Option<UnixListener> {
with_local_io(|io| {
match io.unix_bind(&path.to_c_str()) {
Ok(s) => Some(UnixListener{ obj: s }),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
let mut io = LocalIo::borrow();
match io.get().unix_bind(&path.to_c_str()) {
Ok(s) => Some(UnixListener{ obj: s }),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
})
}
}
}

View File

@ -17,7 +17,7 @@ use prelude::*;
use super::{Reader, Writer};
use io::{io_error, EndOfFile};
use io::native::file;
use rt::rtio::{RtioPipe, with_local_io};
use rt::rtio::{LocalIo, RtioPipe};
pub struct PipeStream {
priv obj: ~RtioPipe,
@ -44,15 +44,14 @@ impl PipeStream {
/// If the pipe cannot be created, an error will be raised on the
/// `io_error` condition.
pub fn open(fd: file::fd_t) -> Option<PipeStream> {
with_local_io(|io| {
match io.pipe_open(fd) {
Ok(obj) => Some(PipeStream { obj: obj }),
Err(e) => {
io_error::cond.raise(e);
None
}
let mut io = LocalIo::borrow();
match io.get().pipe_open(fd) {
Ok(obj) => Some(PipeStream { obj: obj }),
Err(e) => {
io_error::cond.raise(e);
None
}
})
}
}
pub fn new(inner: ~RtioPipe) -> PipeStream {

View File

@ -11,12 +11,11 @@
//! Bindings for executing child processes
use prelude::*;
use cell::Cell;
use libc;
use io;
use io::io_error;
use rt::rtio::{RtioProcess, IoFactory, with_local_io};
use rt::rtio::{RtioProcess, IoFactory, LocalIo};
use fmt;
@ -120,21 +119,19 @@ impl Process {
/// Creates a new pipe initialized, but not bound to any particular
/// source/destination
pub fn new(config: ProcessConfig) -> Option<Process> {
let config = Cell::new(config);
with_local_io(|io| {
match io.spawn(config.take()) {
Ok((p, io)) => Some(Process{
handle: p,
io: io.move_iter().map(|p|
p.map(|p| io::PipeStream::new(p))
).collect()
}),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
let mut io = LocalIo::borrow();
match io.get().spawn(config) {
Ok((p, io)) => Some(Process{
handle: p,
io: io.move_iter().map(|p|
p.map(|p| io::PipeStream::new(p))
).collect()
}),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
})
}
}
/// Returns the process id of this child process

View File

@ -24,9 +24,8 @@ use comm::{Port, SharedChan, stream};
use container::{Map, MutableMap};
use hashmap;
use io::io_error;
use option::{Some, None};
use result::{Err, Ok};
use rt::rtio::{IoFactory, RtioSignal, with_local_io};
use rt::rtio::{IoFactory, LocalIo, RtioSignal};
#[repr(int)]
#[deriving(Eq, IterBytes)]
@ -123,18 +122,17 @@ impl Listener {
if self.handles.contains_key(&signum) {
return true; // self is already listening to signum, so succeed
}
with_local_io(|io| {
match io.signal(signum, self.chan.clone()) {
Ok(w) => {
self.handles.insert(signum, w);
Some(())
},
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
let mut io = LocalIo::borrow();
match io.get().signal(signum, self.chan.clone()) {
Ok(w) => {
self.handles.insert(signum, w);
true
},
Err(ioerr) => {
io_error::cond.raise(ioerr);
false
}
}).is_some()
}
}
/// Unregisters a signal. If this listener currently had a handler

View File

@ -31,7 +31,7 @@ use libc;
use option::{Option, Some, None};
use result::{Ok, Err};
use io::buffered::LineBufferedWriter;
use rt::rtio::{IoFactory, RtioTTY, RtioFileStream, with_local_io, DontClose};
use rt::rtio::{IoFactory, RtioTTY, RtioFileStream, DontClose};
use super::{Reader, Writer, io_error, IoError, OtherIoError,
standard_error, EndOfFile};
@ -69,12 +69,19 @@ enum StdSource {
}
fn src<T>(fd: libc::c_int, readable: bool, f: |StdSource| -> T) -> T {
with_local_io(|io| {
match io.tty_open(fd, readable) {
Ok(tty) => Some(f(TTY(tty))),
Err(_) => Some(f(File(io.fs_from_raw_fd(fd, DontClose)))),
let mut io = LocalIo::borrow();
match io.get().tty_open(fd, readable) {
Ok(tty) => f(TTY(tty)),
Err(_) => {
// It's not really that desirable if these handles are closed
// synchronously, and because they're squirreled away in a task
// structure the destructors will be run when the task is
// attempted to get destroyed. This means that if we run a
// synchronous destructor we'll attempt to do some scheduling
// operations which will just result in sadness.
f(File(io.get().fs_from_raw_fd(fd, DontClose)))
}
}).unwrap()
}
}
/// Creates a new non-blocking handle to the stdin of the current process.

View File

@ -42,7 +42,7 @@ use comm::{Port, PortOne};
use option::{Option, Some, None};
use result::{Ok, Err};
use io::io_error;
use rt::rtio::{IoFactory, RtioTimer, with_local_io};
use rt::rtio::{IoFactory, LocalIo, RtioTimer};
pub struct Timer {
priv obj: ~RtioTimer
@ -60,17 +60,15 @@ impl Timer {
/// for a number of milliseconds, or to possibly create channels which will
/// get notified after an amount of time has passed.
pub fn new() -> Option<Timer> {
with_local_io(|io| {
match io.timer_init() {
Ok(t) => Some(Timer { obj: t }),
Err(ioerr) => {
debug!("Timer::init: failed to init: {:?}", ioerr);
io_error::cond.raise(ioerr);
None
}
let mut io = LocalIo::borrow();
match io.get().timer_init() {
Ok(t) => Some(Timer { obj: t }),
Err(ioerr) => {
debug!("Timer::init: failed to init: {:?}", ioerr);
io_error::cond.raise(ioerr);
None
}
})
}
}
/// Blocks the current task for `msecs` milliseconds.

View File

@ -159,8 +159,11 @@ impl EventLoop for BasicLoop {
~BasicRemote::new(self.messages.clone(), id) as ~RemoteCallback
}
fn io<'a>(&'a mut self, f: |&'a mut IoFactory|) {
f(self.io)
fn io(&mut self) -> &'static mut IoFactory:'static {
unsafe {
let factory: &mut IoFactory = self.io;
cast::transmute(factory)
}
}
}

View File

@ -9,15 +9,19 @@
// except according to those terms.
use c_str::CString;
use cast;
use comm::{SharedChan, PortOne, Port};
use libc::c_int;
use libc;
use ops::Drop;
use option::*;
use path::Path;
use result::*;
use ai = io::net::addrinfo;
use io::IoError;
use io::native::NATIVE_IO_FACTORY;
use io::native;
use io::net::ip::{IpAddr, SocketAddr};
use io::process::{ProcessConfig, ProcessExit};
use io::signal::Signum;
@ -34,9 +38,8 @@ pub trait EventLoop {
fn pausible_idle_callback(&mut self, ~Callback) -> ~PausibleIdleCallback;
fn remote_callback(&mut self, ~Callback) -> ~RemoteCallback;
/// The asynchronous I/O services. Not all event loops may provide one
// FIXME(#9382) this is an awful interface
fn io<'a>(&'a mut self, f: |&'a mut IoFactory|);
/// The asynchronous I/O services. Not all event loops may provide one.
fn io(&mut self) -> &'static mut IoFactory:'static;
}
pub trait RemoteCallback {
@ -75,31 +78,53 @@ pub enum CloseBehavior {
CloseAsynchronously,
}
pub fn with_local_io<T>(f: |&mut IoFactory| -> Option<T>) -> Option<T> {
use rt::sched::Scheduler;
use rt::local::Local;
use io::native;
pub struct LocalIo {
factory: &'static mut IoFactory:'static,
}
unsafe {
// First, attempt to use the local scheduler's I/O services
let sched: Option<*mut Scheduler> = Local::try_unsafe_borrow();
match sched {
Some(sched) => {
let mut io = None;
(*sched).event_loop.io(|i| io = Some(i));
match io {
Some(io) => return f(io),
None => {}
#[unsafe_destructor]
impl Drop for LocalIo {
fn drop(&mut self) {
// XXX(pcwalton): Do nothing here for now, but eventually we may want
// something. For now this serves to make `LocalIo` noncopyable.
}
}
impl LocalIo {
/// Returns the local I/O: either the local scheduler's I/O services or
/// the native I/O services.
pub fn borrow() -> LocalIo {
use rt::sched::Scheduler;
use rt::local::Local;
unsafe {
// First, attempt to use the local scheduler's I/O services
let sched: Option<*mut Scheduler> = Local::try_unsafe_borrow();
match sched {
Some(sched) => {
return LocalIo {
factory: (*sched).event_loop.io(),
}
}
None => {}
}
// If we don't have a scheduler or the scheduler doesn't have I/O
// services, then fall back to the native I/O services.
let native_io: &'static mut native::IoFactory =
&mut NATIVE_IO_FACTORY;
LocalIo {
factory: native_io as &mut IoFactory:'static
}
None => {}
}
}
// If we don't have a scheduler or the scheduler doesn't have I/O services,
// then fall back to the native I/O services.
let mut io = native::IoFactory;
f(&mut io as &mut IoFactory)
/// Returns the underlying I/O factory as a trait reference.
#[inline]
pub fn get(&mut self) -> &'static mut IoFactory {
unsafe {
cast::transmute_copy(&self.factory)
}
}
}
pub trait IoFactory {