From 0cad9847652088b35ee4c13c04539ca3a67611f7 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 16 Oct 2013 14:48:05 -0700 Subject: [PATCH] Migrate Rtio objects to true trait objects This moves as many as I could over to ~Trait instead of ~Typedef. The only remaining one is the IoFactoryObject which should be coming soon... --- src/libstd/rt/io/net/tcp.rs | 13 +++---- src/libstd/rt/io/net/udp.rs | 4 +- src/libstd/rt/io/net/unix.rs | 9 ++--- src/libstd/rt/io/pipe.rs | 6 +-- src/libstd/rt/io/process.rs | 4 +- src/libstd/rt/io/stdio.rs | 6 +-- src/libstd/rt/io/timer.rs | 5 +-- src/libstd/rt/mod.rs | 4 +- src/libstd/rt/rtio.rs | 46 ++++++++++------------- src/libstd/rt/sched.rs | 12 +++--- src/libstd/rt/test.rs | 5 ++- src/libstd/rt/uv/uvio.rs | 73 +++++++++++++++++++----------------- src/libstd/task/spawn.rs | 4 +- 13 files changed, 92 insertions(+), 99 deletions(-) diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index f29e17cfc2f..946ecbea6f7 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -13,19 +13,16 @@ use result::{Ok, Err}; use rt::io::net::ip::SocketAddr; use rt::io::{Reader, Writer, Listener, Acceptor}; use rt::io::{io_error, read_error, EndOfFile}; -use rt::rtio::{IoFactory, IoFactoryObject, - RtioSocket, - RtioTcpListener, RtioTcpListenerObject, - RtioTcpAcceptor, RtioTcpAcceptorObject, - RtioTcpStream, RtioTcpStreamObject}; +use rt::rtio::{IoFactory, IoFactoryObject, RtioTcpListenerObject, + RtioSocket, RtioTcpListener, RtioTcpAcceptor, RtioTcpStream}; use rt::local::Local; pub struct TcpStream { - priv obj: ~RtioTcpStreamObject + priv obj: ~RtioTcpStream } impl TcpStream { - fn new(s: ~RtioTcpStreamObject) -> TcpStream { + fn new(s: ~RtioTcpStream) -> TcpStream { TcpStream { obj: s } } @@ -142,7 +139,7 @@ impl Listener for TcpListener { } pub struct TcpAcceptor { - priv obj: ~RtioTcpAcceptorObject + priv obj: ~RtioTcpAcceptor } impl Acceptor for TcpAcceptor { diff --git a/src/libstd/rt/io/net/udp.rs b/src/libstd/rt/io/net/udp.rs index 27faae0838b..ed01dc9dcda 100644 --- a/src/libstd/rt/io/net/udp.rs +++ b/src/libstd/rt/io/net/udp.rs @@ -13,11 +13,11 @@ use result::{Ok, Err}; use rt::io::net::ip::SocketAddr; use rt::io::{Reader, Writer}; use rt::io::{io_error, read_error, EndOfFile}; -use rt::rtio::{RtioSocket, RtioUdpSocketObject, RtioUdpSocket, IoFactory, IoFactoryObject}; +use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory, IoFactoryObject}; use rt::local::Local; pub struct UdpSocket { - priv obj: ~RtioUdpSocketObject + priv obj: ~RtioUdpSocket } impl UdpSocket { diff --git a/src/libstd/rt/io/net/unix.rs b/src/libstd/rt/io/net/unix.rs index 9428c1f800d..1394cdb04a8 100644 --- a/src/libstd/rt/io/net/unix.rs +++ b/src/libstd/rt/io/net/unix.rs @@ -25,9 +25,8 @@ instances as clients. use prelude::*; use super::super::support::PathLike; -use rt::rtio::{IoFactory, IoFactoryObject, RtioUnixListenerObject}; -use rt::rtio::{RtioUnixAcceptorObject, RtioPipeObject, RtioUnixListener}; -use rt::rtio::RtioUnixAcceptor; +use rt::rtio::{IoFactory, IoFactoryObject, RtioUnixListener}; +use rt::rtio::{RtioUnixAcceptor, RtioPipe, RtioUnixListenerObject}; use rt::io::pipe::PipeStream; use rt::io::{io_error, Listener, Acceptor, Reader, Writer}; use rt::local::Local; @@ -38,7 +37,7 @@ pub struct UnixStream { } impl UnixStream { - fn new(obj: ~RtioPipeObject) -> UnixStream { + fn new(obj: ~RtioPipe) -> UnixStream { UnixStream { obj: PipeStream::new_bound(obj) } } @@ -141,7 +140,7 @@ impl Listener for UnixListener { } pub struct UnixAcceptor { - priv obj: ~RtioUnixAcceptorObject, + priv obj: ~RtioUnixAcceptor, } impl Acceptor for UnixAcceptor { diff --git a/src/libstd/rt/io/pipe.rs b/src/libstd/rt/io/pipe.rs index 67e04f57f4f..c15fbc79da9 100644 --- a/src/libstd/rt/io/pipe.rs +++ b/src/libstd/rt/io/pipe.rs @@ -16,14 +16,14 @@ use prelude::*; use super::{Reader, Writer}; use rt::io::{io_error, read_error, EndOfFile}; -use rt::rtio::{RtioPipe, RtioPipeObject}; +use rt::rtio::RtioPipe; pub struct PipeStream { - priv obj: ~RtioPipeObject + priv obj: ~RtioPipe, } impl PipeStream { - pub fn new_bound(inner: ~RtioPipeObject) -> PipeStream { + pub fn new_bound(inner: ~RtioPipe) -> PipeStream { PipeStream { obj: inner } } } diff --git a/src/libstd/rt/io/process.rs b/src/libstd/rt/io/process.rs index f6e8b87344f..c13b275ae52 100644 --- a/src/libstd/rt/io/process.rs +++ b/src/libstd/rt/io/process.rs @@ -16,7 +16,7 @@ use libc; use rt::io; use rt::io::io_error; use rt::local::Local; -use rt::rtio::{RtioProcess, RtioProcessObject, IoFactoryObject, IoFactory}; +use rt::rtio::{RtioProcess, IoFactoryObject, IoFactory}; // windows values don't matter as long as they're at least one of unix's // TERM/KILL/INT signals @@ -26,7 +26,7 @@ use rt::rtio::{RtioProcess, RtioProcessObject, IoFactoryObject, IoFactory}; #[cfg(not(windows))] pub static MustDieSignal: int = libc::SIGKILL as int; pub struct Process { - priv handle: ~RtioProcessObject, + priv handle: ~RtioProcess, io: ~[Option], } diff --git a/src/libstd/rt/io/stdio.rs b/src/libstd/rt/io/stdio.rs index 0bc87c77a9c..77ac87830e2 100644 --- a/src/libstd/rt/io/stdio.rs +++ b/src/libstd/rt/io/stdio.rs @@ -13,7 +13,7 @@ use libc; use option::{Option, Some, None}; use result::{Ok, Err}; use rt::local::Local; -use rt::rtio::{IoFactoryObject, IoFactory, RtioTTYObject, RtioTTY}; +use rt::rtio::{IoFactoryObject, IoFactory, RtioTTY}; use super::{Reader, Writer, io_error}; /// Creates a new non-blocking handle to the stdin of the current process. @@ -87,7 +87,7 @@ pub fn println_args(fmt: &fmt::Arguments) { /// Representation of a reader of a standard input stream pub struct StdReader { - priv inner: ~RtioTTYObject + priv inner: ~RtioTTY } impl StdReader { @@ -129,7 +129,7 @@ impl Reader for StdReader { /// Representation of a writer to a standard output stream pub struct StdWriter { - priv inner: ~RtioTTYObject + priv inner: ~RtioTTY } impl StdWriter { diff --git a/src/libstd/rt/io/timer.rs b/src/libstd/rt/io/timer.rs index b41d7541a60..7d13e034dc1 100644 --- a/src/libstd/rt/io/timer.rs +++ b/src/libstd/rt/io/timer.rs @@ -11,12 +11,11 @@ use option::{Option, Some, None}; use result::{Ok, Err}; use rt::io::{io_error}; -use rt::rtio::{IoFactory, IoFactoryObject, - RtioTimer, RtioTimerObject}; +use rt::rtio::{IoFactory, IoFactoryObject, RtioTimer}; use rt::local::Local; pub struct Timer { - priv obj: ~RtioTimerObject + priv obj: ~RtioTimer } /// Sleep the current task for `msecs` milliseconds. diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 9ea7b734d24..66d7a6bf488 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -279,7 +279,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { rtdebug!("inserting a regular scheduler"); // Every scheduler is driven by an I/O event loop. - let loop_ = ~UvEventLoop::new(); + let loop_ = ~UvEventLoop::new() as ~rtio::EventLoop; let mut sched = ~Scheduler::new(loop_, work_queue.clone(), work_queues.clone(), @@ -303,7 +303,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { // set. let work_queue = WorkQueue::new(); - let main_loop = ~UvEventLoop::new(); + let main_loop = ~UvEventLoop::new() as ~rtio::EventLoop; let mut main_sched = ~Scheduler::new_special(main_loop, work_queue, work_queues.clone(), diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 3c513e263f1..ef695130e22 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -23,31 +23,18 @@ use super::io::support::PathLike; use super::io::{SeekStyle}; use super::io::{FileMode, FileAccess, FileStat}; -// XXX: ~object doesn't work currently so these are some placeholder -// types to use instead -pub type EventLoopObject = uvio::UvEventLoop; -pub type RemoteCallbackObject = uvio::UvRemoteCallback; -pub type IoFactoryObject = uvio::UvIoFactory; -pub type RtioTcpStreamObject = uvio::UvTcpStream; -pub type RtioTcpAcceptorObject = uvio::UvTcpAcceptor; +// FIXME(#9893) cannot call by-value self method on a trait object pub type RtioTcpListenerObject = uvio::UvTcpListener; -pub type RtioUdpSocketObject = uvio::UvUdpSocket; -pub type RtioTimerObject = uvio::UvTimer; -pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback; -pub type RtioPipeObject = uvio::UvPipeStream; -pub type RtioProcessObject = uvio::UvProcess; pub type RtioUnixListenerObject = uvio::UvUnixListener; -pub type RtioUnixAcceptorObject = uvio::UvUnixAcceptor; -pub type RtioTTYObject = uvio::UvTTY; pub trait EventLoop { fn run(&mut self); fn callback(&mut self, ~fn()); fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback; fn callback_ms(&mut self, ms: u64, ~fn()); - fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject; + fn remote_callback(&mut self, ~fn()) -> ~RemoteCallback; /// The asynchronous I/O services. Not all event loops may provide one - fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject>; + fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory>; } pub trait RemoteCallback { @@ -73,10 +60,10 @@ pub struct FileOpenConfig { } pub trait IoFactory { - fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoError>; + fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStream, IoError>; fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError>; - fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError>; - fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError>; + fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocket, IoError>; + fn timer_init(&mut self) -> Result<~RtioTimer, IoError>; fn fs_from_raw_fd(&mut self, fd: c_int, close_on_drop: bool) -> ~RtioFileStream; fn fs_open(&mut self, path: &P, fm: FileMode, fa: FileAccess) -> Result<~RtioFileStream, IoError>; @@ -89,22 +76,22 @@ pub trait IoFactory { fn fs_readdir(&mut self, path: &P, flags: c_int) -> Result<~[Path], IoError>; fn spawn(&mut self, config: ProcessConfig) - -> Result<(~RtioProcessObject, ~[Option<~RtioPipeObject>]), IoError>; + -> Result<(~RtioProcess, ~[Option<~RtioPipe>]), IoError>; fn unix_bind(&mut self, path: &P) -> Result<~RtioUnixListenerObject, IoError>; fn unix_connect(&mut self, path: &P) -> - Result<~RtioPipeObject, IoError>; + Result<~RtioPipe, IoError>; fn tty_open(&mut self, fd: c_int, readable: bool, close_on_drop: bool) - -> Result<~RtioTTYObject, IoError>; + -> Result<~RtioTTY, IoError>; } pub trait RtioTcpListener : RtioSocket { - fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError>; + fn listen(self) -> Result<~RtioTcpAcceptor, IoError>; } pub trait RtioTcpAcceptor : RtioSocket { - fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>; + fn accept(&mut self) -> Result<~RtioTcpStream, IoError>; fn accept_simultaneously(&mut self) -> Result<(), IoError>; fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>; } @@ -166,11 +153,11 @@ pub trait RtioPipe { } pub trait RtioUnixListener { - fn listen(self) -> Result<~RtioUnixAcceptorObject, IoError>; + fn listen(self) -> Result<~RtioUnixAcceptor, IoError>; } pub trait RtioUnixAcceptor { - fn accept(&mut self) -> Result<~RtioPipeObject, IoError>; + fn accept(&mut self) -> Result<~RtioPipe, IoError>; fn accept_simultaneously(&mut self) -> Result<(), IoError>; fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>; } @@ -182,3 +169,10 @@ pub trait RtioTTY { fn reset_mode(&mut self); fn get_winsize(&mut self) -> Result<(int, int), IoError>; } + +pub trait PausibleIdleCallback { + fn start(&mut self, f: ~fn()); + fn pause(&mut self); + fn resume(&mut self); + fn close(&mut self); +} diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index ee163bab3c0..464e2b2c4c2 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -16,7 +16,7 @@ use unstable::raw; use super::sleeper_list::SleeperList; use super::work_queue::WorkQueue; use super::stack::{StackPool}; -use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject}; +use super::rtio::EventLoop; use super::context::Context; use super::task::{Task, AnySched, Sched}; use super::message_queue::MessageQueue; @@ -63,7 +63,7 @@ pub struct Scheduler { no_sleep: bool, stack_pool: StackPool, /// The event loop used to drive the scheduler and perform I/O - event_loop: ~EventLoopObject, + event_loop: ~EventLoop, /// The scheduler runs on a special task. When it is not running /// it is stored here instead of the work queue. priv sched_task: Option<~Task>, @@ -107,7 +107,7 @@ impl Scheduler { // * Initialization Functions - pub fn new(event_loop: ~EventLoopObject, + pub fn new(event_loop: ~EventLoop, work_queue: WorkQueue<~Task>, work_queues: ~[WorkQueue<~Task>], sleeper_list: SleeperList) @@ -119,7 +119,7 @@ impl Scheduler { } - pub fn new_special(event_loop: ~EventLoopObject, + pub fn new_special(event_loop: ~EventLoop, work_queue: WorkQueue<~Task>, work_queues: ~[WorkQueue<~Task>], sleeper_list: SleeperList, @@ -227,7 +227,7 @@ impl Scheduler { // mutable reference to the event_loop to give it the "run" // command. unsafe { - let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop; + let event_loop: *mut ~EventLoop = &mut self_sched.event_loop; // Our scheduler must be in the task before the event loop // is started. @@ -793,7 +793,7 @@ pub enum SchedMessage { } pub struct SchedHandle { - priv remote: ~RemoteCallbackObject, + priv remote: ~RemoteCallback, priv queue: MessageQueue, sched_id: uint } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index 759550e5cbd..66d3f3de6ec 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -22,6 +22,7 @@ use super::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr}; use vec::{OwnedVector, MutableVector, ImmutableVector}; use path::GenericPath; use rt::sched::Scheduler; +use rt::rtio::EventLoop; use unstable::{run_in_bare_thread}; use rt::thread::Thread; use rt::task::Task; @@ -36,7 +37,7 @@ pub fn new_test_uv_sched() -> Scheduler { let queue = WorkQueue::new(); let queues = ~[queue.clone()]; - let mut sched = Scheduler::new(~UvEventLoop::new(), + let mut sched = Scheduler::new(~UvEventLoop::new() as ~EventLoop, queue, queues, SleeperList::new()); @@ -195,7 +196,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { } for i in range(0u, nthreads) { - let loop_ = ~UvEventLoop::new(); + let loop_ = ~UvEventLoop::new() as ~EventLoop; let mut sched = ~Scheduler::new(loop_, work_queues[i].clone(), work_queues.clone(), diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index f1a5916ee13..00572d66573 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -215,11 +215,11 @@ impl EventLoop for UvEventLoop { fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback { let idle_watcher = IdleWatcher::new(self.uvio.uv_loop()); - return ~UvPausibleIdleCallback { + ~UvPausibleIdleCallback { watcher: idle_watcher, idle_flag: false, closed: false - }; + } as ~PausibleIdleCallback } fn callback_ms(&mut self, ms: u64, f: ~fn()) { @@ -231,12 +231,12 @@ impl EventLoop for UvEventLoop { } } - fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject { - ~UvRemoteCallback::new(self.uvio.uv_loop(), f) + fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallback{ + ~UvRemoteCallback::new(self.uvio.uv_loop(), f) as ~RemoteCallback } - fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> { - Some(&mut self.uvio) + fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> { + Some(&mut self.uvio as &mut IoFactory) } } @@ -246,30 +246,30 @@ pub struct UvPausibleIdleCallback { priv closed: bool } -impl UvPausibleIdleCallback { +impl RtioPausibleIdleCallback for UvPausibleIdleCallback { #[inline] - pub fn start(&mut self, f: ~fn()) { + fn start(&mut self, f: ~fn()) { do self.watcher.start |_idle_watcher, _status| { f(); }; self.idle_flag = true; } #[inline] - pub fn pause(&mut self) { + fn pause(&mut self) { if self.idle_flag == true { self.watcher.stop(); self.idle_flag = false; } } #[inline] - pub fn resume(&mut self) { + fn resume(&mut self) { if self.idle_flag == false { self.watcher.restart(); self.idle_flag = true; } } #[inline] - pub fn close(&mut self) { + fn close(&mut self) { self.pause(); if !self.closed { self.closed = true; @@ -447,11 +447,11 @@ impl IoFactory for UvIoFactory { // Connect to an address and return a new stream // NB: This blocks the task waiting on the connection. // It would probably be better to return a future - fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoError> { + fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStream, IoError> { // Create a cell in the task to hold the result. We will fill // the cell before resuming the task. let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; + let result_cell_ptr: *Cell> = &result_cell; // Block this task and take ownership, switch to scheduler context do task::unkillable { // FIXME(#8674) @@ -467,7 +467,8 @@ impl IoFactory for UvIoFactory { None => { let tcp = NativeHandle::from_native_handle(stream.native_handle()); let home = get_handle_to_current_scheduler!(); - let res = Ok(~UvTcpStream { watcher: tcp, home: home }); + let res = Ok(~UvTcpStream { watcher: tcp, home: home } + as ~RtioTcpStream); // Store the stream in the task's stack unsafe { (*result_cell_ptr).put_back(res); } @@ -517,12 +518,12 @@ impl IoFactory for UvIoFactory { } } - fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> { + fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocket, IoError> { let mut watcher = UdpWatcher::new(self.uv_loop()); match watcher.bind(addr) { Ok(_) => { let home = get_handle_to_current_scheduler!(); - Ok(~UvUdpSocket { watcher: watcher, home: home }) + Ok(~UvUdpSocket { watcher: watcher, home: home } as ~RtioUdpSocket) } Err(uverr) => { do task::unkillable { // FIXME(#8674) @@ -540,10 +541,10 @@ impl IoFactory for UvIoFactory { } } - fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> { + fn timer_init(&mut self) -> Result<~RtioTimer, IoError> { let watcher = TimerWatcher::new(self.uv_loop()); let home = get_handle_to_current_scheduler!(); - Ok(~UvTimer::new(watcher, home)) + Ok(~UvTimer::new(watcher, home) as ~RtioTimer) } fn fs_from_raw_fd(&mut self, fd: c_int, close_on_drop: bool) -> ~RtioFileStream { @@ -750,7 +751,7 @@ impl IoFactory for UvIoFactory { } fn spawn(&mut self, config: ProcessConfig) - -> Result<(~RtioProcessObject, ~[Option<~RtioPipeObject>]), IoError> + -> Result<(~RtioProcess, ~[Option<~RtioPipe>]), IoError> { // Sadly, we must create the UvProcess before we actually call uv_spawn // so that the exit_cb can close over it and notify it when the process @@ -792,7 +793,8 @@ impl IoFactory for UvIoFactory { Ok(io) => { // Only now do we actually get a handle to this scheduler. ret.home = Some(get_handle_to_current_scheduler!()); - Ok((ret, io)) + Ok((ret as ~RtioProcess, + io.move_iter().map(|p| p.map(|p| p as ~RtioPipe)).collect())) } Err(uverr) => { // We still need to close the process handle we created, but @@ -827,12 +829,12 @@ impl IoFactory for UvIoFactory { } fn unix_connect(&mut self, path: &P) -> - Result<~RtioPipeObject, IoError> + Result<~RtioPipe, IoError> { let scheduler: ~Scheduler = Local::take(); let mut pipe = Pipe::new(self.uv_loop(), false); let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell> = &result_cell; + let result_cell_ptr: *Cell> = &result_cell; do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); @@ -845,7 +847,7 @@ impl IoFactory for UvIoFactory { handle as *uvll::uv_pipe_t); let home = get_handle_to_current_scheduler!(); let pipe = UvUnboundPipe::new(pipe, home); - Ok(~UvPipeStream::new(pipe)) + Ok(~UvPipeStream::new(pipe) as ~RtioPipe) } Some(e) => { Err(uv_error_to_io_error(e)) } }; @@ -871,13 +873,13 @@ impl IoFactory for UvIoFactory { } fn tty_open(&mut self, fd: c_int, readable: bool, close_on_drop: bool) - -> Result<~RtioTTYObject, IoError> { + -> Result<~RtioTTY, IoError> { match tty::TTY::new(self.uv_loop(), fd, readable) { Ok(tty) => Ok(~UvTTY { home: get_handle_to_current_scheduler!(), tty: tty, close_on_drop: close_on_drop, - }), + } as ~RtioTTY), Err(e) => Err(uv_error_to_io_error(e)) } } @@ -921,7 +923,7 @@ impl RtioSocket for UvTcpListener { } impl RtioTcpListener for UvTcpListener { - fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError> { + fn listen(self) -> Result<~RtioTcpAcceptor, IoError> { do self.home_for_io_consume |self_| { let acceptor = ~UvTcpAcceptor::new(self_); let incoming = Cell::new(acceptor.incoming.clone()); @@ -935,14 +937,15 @@ impl RtioTcpListener for UvTcpListener { // first accept call in the callback guarenteed to succeed server.accept(inc.as_stream()); let home = get_handle_to_current_scheduler!(); - Ok(~UvTcpStream { watcher: inc, home: home }) + Ok(~UvTcpStream { watcher: inc, home: home } + as ~RtioTcpStream) } }; incoming.send(inc); } }; match res { - Ok(()) => Ok(acceptor), + Ok(()) => Ok(acceptor as ~RtioTcpAcceptor), Err(e) => Err(uv_error_to_io_error(e)), } } @@ -951,7 +954,7 @@ impl RtioTcpListener for UvTcpListener { pub struct UvTcpAcceptor { priv listener: UvTcpListener, - priv incoming: Tube>, + priv incoming: Tube>, } impl HomingIO for UvTcpAcceptor { @@ -984,7 +987,7 @@ fn accept_simultaneously(stream: StreamWatcher, a: int) -> Result<(), IoError> { } impl RtioTcpAcceptor for UvTcpAcceptor { - fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> { + fn accept(&mut self) -> Result<~RtioTcpStream, IoError> { do self.home_for_io |self_| { self_.incoming.recv() } @@ -1718,7 +1721,7 @@ impl UvUnixListener { } impl RtioUnixListener for UvUnixListener { - fn listen(self) -> Result<~RtioUnixAcceptorObject, IoError> { + fn listen(self) -> Result<~RtioUnixAcceptor, IoError> { do self.home_for_io_consume |self_| { let acceptor = ~UvUnixAcceptor::new(self_); let incoming = Cell::new(acceptor.incoming.clone()); @@ -1732,14 +1735,14 @@ impl RtioUnixListener for UvUnixListener { server.accept(inc.as_stream()); let home = get_handle_to_current_scheduler!(); let pipe = UvUnboundPipe::new(inc, home); - Ok(~UvPipeStream::new(pipe)) + Ok(~UvPipeStream::new(pipe) as ~RtioPipe) } }; incoming.send(inc); } }; match res { - Ok(()) => Ok(acceptor), + Ok(()) => Ok(acceptor as ~RtioUnixAcceptor), Err(e) => Err(uv_error_to_io_error(e)), } } @@ -1776,7 +1779,7 @@ impl Drop for UvTTY { pub struct UvUnixAcceptor { listener: UvUnixListener, - incoming: Tube>, + incoming: Tube>, } impl HomingIO for UvUnixAcceptor { @@ -1790,7 +1793,7 @@ impl UvUnixAcceptor { } impl RtioUnixAcceptor for UvUnixAcceptor { - fn accept(&mut self) -> Result<~RtioPipeObject, IoError> { + fn accept(&mut self) -> Result<~RtioPipe, IoError> { do self.home_for_io |self_| { self_.incoming.recv() } diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index dec13eded39..fbe2988f77c 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -89,7 +89,7 @@ use unstable::sync::Exclusive; use rt::in_green_task_context; use rt::local::Local; use rt::task::{Task, Sched}; -use rt::shouldnt_be_public::{Scheduler, KillHandle, WorkQueue, Thread}; +use rt::shouldnt_be_public::{Scheduler, KillHandle, WorkQueue, Thread, EventLoop}; use rt::uv::uvio::UvEventLoop; #[cfg(test)] use task::default_task_opts; @@ -607,7 +607,7 @@ pub fn spawn_raw(mut opts: TaskOpts, f: ~fn()) { let work_queue = WorkQueue::new(); // Create a new scheduler to hold the new task - let new_loop = ~UvEventLoop::new(); + let new_loop = ~UvEventLoop::new() as ~EventLoop; let mut new_sched = ~Scheduler::new_special(new_loop, work_queue, (*sched).work_queues.clone(),