Migrate Rtio objects to true trait objects

This moves as many as I could over to ~Trait instead of ~Typedef. The only
remaining one is the IoFactoryObject which should be coming soon...
This commit is contained in:
Alex Crichton 2013-10-16 14:48:05 -07:00
parent 35756fbcf6
commit 0cad984765
13 changed files with 92 additions and 99 deletions

View File

@ -13,19 +13,16 @@ use result::{Ok, Err};
use rt::io::net::ip::SocketAddr;
use rt::io::{Reader, Writer, Listener, Acceptor};
use rt::io::{io_error, read_error, EndOfFile};
use rt::rtio::{IoFactory, IoFactoryObject,
RtioSocket,
RtioTcpListener, RtioTcpListenerObject,
RtioTcpAcceptor, RtioTcpAcceptorObject,
RtioTcpStream, RtioTcpStreamObject};
use rt::rtio::{IoFactory, IoFactoryObject, RtioTcpListenerObject,
RtioSocket, RtioTcpListener, RtioTcpAcceptor, RtioTcpStream};
use rt::local::Local;
pub struct TcpStream {
priv obj: ~RtioTcpStreamObject
priv obj: ~RtioTcpStream
}
impl TcpStream {
fn new(s: ~RtioTcpStreamObject) -> TcpStream {
fn new(s: ~RtioTcpStream) -> TcpStream {
TcpStream { obj: s }
}
@ -142,7 +139,7 @@ impl Listener<TcpStream, TcpAcceptor> for TcpListener {
}
pub struct TcpAcceptor {
priv obj: ~RtioTcpAcceptorObject
priv obj: ~RtioTcpAcceptor
}
impl Acceptor<TcpStream> for TcpAcceptor {

View File

@ -13,11 +13,11 @@ use result::{Ok, Err};
use rt::io::net::ip::SocketAddr;
use rt::io::{Reader, Writer};
use rt::io::{io_error, read_error, EndOfFile};
use rt::rtio::{RtioSocket, RtioUdpSocketObject, RtioUdpSocket, IoFactory, IoFactoryObject};
use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory, IoFactoryObject};
use rt::local::Local;
pub struct UdpSocket {
priv obj: ~RtioUdpSocketObject
priv obj: ~RtioUdpSocket
}
impl UdpSocket {

View File

@ -25,9 +25,8 @@ instances as clients.
use prelude::*;
use super::super::support::PathLike;
use rt::rtio::{IoFactory, IoFactoryObject, RtioUnixListenerObject};
use rt::rtio::{RtioUnixAcceptorObject, RtioPipeObject, RtioUnixListener};
use rt::rtio::RtioUnixAcceptor;
use rt::rtio::{IoFactory, IoFactoryObject, RtioUnixListener};
use rt::rtio::{RtioUnixAcceptor, RtioPipe, RtioUnixListenerObject};
use rt::io::pipe::PipeStream;
use rt::io::{io_error, Listener, Acceptor, Reader, Writer};
use rt::local::Local;
@ -38,7 +37,7 @@ pub struct UnixStream {
}
impl UnixStream {
fn new(obj: ~RtioPipeObject) -> UnixStream {
fn new(obj: ~RtioPipe) -> UnixStream {
UnixStream { obj: PipeStream::new_bound(obj) }
}
@ -141,7 +140,7 @@ impl Listener<UnixStream, UnixAcceptor> for UnixListener {
}
pub struct UnixAcceptor {
priv obj: ~RtioUnixAcceptorObject,
priv obj: ~RtioUnixAcceptor,
}
impl Acceptor<UnixStream> for UnixAcceptor {

View File

@ -16,14 +16,14 @@
use prelude::*;
use super::{Reader, Writer};
use rt::io::{io_error, read_error, EndOfFile};
use rt::rtio::{RtioPipe, RtioPipeObject};
use rt::rtio::RtioPipe;
pub struct PipeStream {
priv obj: ~RtioPipeObject
priv obj: ~RtioPipe,
}
impl PipeStream {
pub fn new_bound(inner: ~RtioPipeObject) -> PipeStream {
pub fn new_bound(inner: ~RtioPipe) -> PipeStream {
PipeStream { obj: inner }
}
}

View File

@ -16,7 +16,7 @@ use libc;
use rt::io;
use rt::io::io_error;
use rt::local::Local;
use rt::rtio::{RtioProcess, RtioProcessObject, IoFactoryObject, IoFactory};
use rt::rtio::{RtioProcess, IoFactoryObject, IoFactory};
// windows values don't matter as long as they're at least one of unix's
// TERM/KILL/INT signals
@ -26,7 +26,7 @@ use rt::rtio::{RtioProcess, RtioProcessObject, IoFactoryObject, IoFactory};
#[cfg(not(windows))] pub static MustDieSignal: int = libc::SIGKILL as int;
pub struct Process {
priv handle: ~RtioProcessObject,
priv handle: ~RtioProcess,
io: ~[Option<io::PipeStream>],
}

View File

@ -13,7 +13,7 @@ use libc;
use option::{Option, Some, None};
use result::{Ok, Err};
use rt::local::Local;
use rt::rtio::{IoFactoryObject, IoFactory, RtioTTYObject, RtioTTY};
use rt::rtio::{IoFactoryObject, IoFactory, RtioTTY};
use super::{Reader, Writer, io_error};
/// Creates a new non-blocking handle to the stdin of the current process.
@ -87,7 +87,7 @@ pub fn println_args(fmt: &fmt::Arguments) {
/// Representation of a reader of a standard input stream
pub struct StdReader {
priv inner: ~RtioTTYObject
priv inner: ~RtioTTY
}
impl StdReader {
@ -129,7 +129,7 @@ impl Reader for StdReader {
/// Representation of a writer to a standard output stream
pub struct StdWriter {
priv inner: ~RtioTTYObject
priv inner: ~RtioTTY
}
impl StdWriter {

View File

@ -11,12 +11,11 @@
use option::{Option, Some, None};
use result::{Ok, Err};
use rt::io::{io_error};
use rt::rtio::{IoFactory, IoFactoryObject,
RtioTimer, RtioTimerObject};
use rt::rtio::{IoFactory, IoFactoryObject, RtioTimer};
use rt::local::Local;
pub struct Timer {
priv obj: ~RtioTimerObject
priv obj: ~RtioTimer
}
/// Sleep the current task for `msecs` milliseconds.

View File

@ -279,7 +279,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
rtdebug!("inserting a regular scheduler");
// Every scheduler is driven by an I/O event loop.
let loop_ = ~UvEventLoop::new();
let loop_ = ~UvEventLoop::new() as ~rtio::EventLoop;
let mut sched = ~Scheduler::new(loop_,
work_queue.clone(),
work_queues.clone(),
@ -303,7 +303,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
// set.
let work_queue = WorkQueue::new();
let main_loop = ~UvEventLoop::new();
let main_loop = ~UvEventLoop::new() as ~rtio::EventLoop;
let mut main_sched = ~Scheduler::new_special(main_loop,
work_queue,
work_queues.clone(),

View File

@ -23,31 +23,18 @@ use super::io::support::PathLike;
use super::io::{SeekStyle};
use super::io::{FileMode, FileAccess, FileStat};
// XXX: ~object doesn't work currently so these are some placeholder
// types to use instead
pub type EventLoopObject = uvio::UvEventLoop;
pub type RemoteCallbackObject = uvio::UvRemoteCallback;
pub type IoFactoryObject = uvio::UvIoFactory;
pub type RtioTcpStreamObject = uvio::UvTcpStream;
pub type RtioTcpAcceptorObject = uvio::UvTcpAcceptor;
// FIXME(#9893) cannot call by-value self method on a trait object
pub type RtioTcpListenerObject = uvio::UvTcpListener;
pub type RtioUdpSocketObject = uvio::UvUdpSocket;
pub type RtioTimerObject = uvio::UvTimer;
pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback;
pub type RtioPipeObject = uvio::UvPipeStream;
pub type RtioProcessObject = uvio::UvProcess;
pub type RtioUnixListenerObject = uvio::UvUnixListener;
pub type RtioUnixAcceptorObject = uvio::UvUnixAcceptor;
pub type RtioTTYObject = uvio::UvTTY;
pub trait EventLoop {
fn run(&mut self);
fn callback(&mut self, ~fn());
fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback;
fn callback_ms(&mut self, ms: u64, ~fn());
fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject;
fn remote_callback(&mut self, ~fn()) -> ~RemoteCallback;
/// The asynchronous I/O services. Not all event loops may provide one
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject>;
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory>;
}
pub trait RemoteCallback {
@ -73,10 +60,10 @@ pub struct FileOpenConfig {
}
pub trait IoFactory {
fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoError>;
fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStream, IoError>;
fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError>;
fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError>;
fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError>;
fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocket, IoError>;
fn timer_init(&mut self) -> Result<~RtioTimer, IoError>;
fn fs_from_raw_fd(&mut self, fd: c_int, close_on_drop: bool) -> ~RtioFileStream;
fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess)
-> Result<~RtioFileStream, IoError>;
@ -89,22 +76,22 @@ pub trait IoFactory {
fn fs_readdir<P: PathLike>(&mut self, path: &P, flags: c_int) ->
Result<~[Path], IoError>;
fn spawn(&mut self, config: ProcessConfig)
-> Result<(~RtioProcessObject, ~[Option<~RtioPipeObject>]), IoError>;
-> Result<(~RtioProcess, ~[Option<~RtioPipe>]), IoError>;
fn unix_bind<P: PathLike>(&mut self, path: &P) ->
Result<~RtioUnixListenerObject, IoError>;
fn unix_connect<P: PathLike>(&mut self, path: &P) ->
Result<~RtioPipeObject, IoError>;
Result<~RtioPipe, IoError>;
fn tty_open(&mut self, fd: c_int, readable: bool, close_on_drop: bool)
-> Result<~RtioTTYObject, IoError>;
-> Result<~RtioTTY, IoError>;
}
pub trait RtioTcpListener : RtioSocket {
fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError>;
fn listen(self) -> Result<~RtioTcpAcceptor, IoError>;
}
pub trait RtioTcpAcceptor : RtioSocket {
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>;
fn accept(&mut self) -> Result<~RtioTcpStream, IoError>;
fn accept_simultaneously(&mut self) -> Result<(), IoError>;
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>;
}
@ -166,11 +153,11 @@ pub trait RtioPipe {
}
pub trait RtioUnixListener {
fn listen(self) -> Result<~RtioUnixAcceptorObject, IoError>;
fn listen(self) -> Result<~RtioUnixAcceptor, IoError>;
}
pub trait RtioUnixAcceptor {
fn accept(&mut self) -> Result<~RtioPipeObject, IoError>;
fn accept(&mut self) -> Result<~RtioPipe, IoError>;
fn accept_simultaneously(&mut self) -> Result<(), IoError>;
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>;
}
@ -182,3 +169,10 @@ pub trait RtioTTY {
fn reset_mode(&mut self);
fn get_winsize(&mut self) -> Result<(int, int), IoError>;
}
pub trait PausibleIdleCallback {
fn start(&mut self, f: ~fn());
fn pause(&mut self);
fn resume(&mut self);
fn close(&mut self);
}

View File

@ -16,7 +16,7 @@ use unstable::raw;
use super::sleeper_list::SleeperList;
use super::work_queue::WorkQueue;
use super::stack::{StackPool};
use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject};
use super::rtio::EventLoop;
use super::context::Context;
use super::task::{Task, AnySched, Sched};
use super::message_queue::MessageQueue;
@ -63,7 +63,7 @@ pub struct Scheduler {
no_sleep: bool,
stack_pool: StackPool,
/// The event loop used to drive the scheduler and perform I/O
event_loop: ~EventLoopObject,
event_loop: ~EventLoop,
/// The scheduler runs on a special task. When it is not running
/// it is stored here instead of the work queue.
priv sched_task: Option<~Task>,
@ -107,7 +107,7 @@ impl Scheduler {
// * Initialization Functions
pub fn new(event_loop: ~EventLoopObject,
pub fn new(event_loop: ~EventLoop,
work_queue: WorkQueue<~Task>,
work_queues: ~[WorkQueue<~Task>],
sleeper_list: SleeperList)
@ -119,7 +119,7 @@ impl Scheduler {
}
pub fn new_special(event_loop: ~EventLoopObject,
pub fn new_special(event_loop: ~EventLoop,
work_queue: WorkQueue<~Task>,
work_queues: ~[WorkQueue<~Task>],
sleeper_list: SleeperList,
@ -227,7 +227,7 @@ impl Scheduler {
// mutable reference to the event_loop to give it the "run"
// command.
unsafe {
let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop;
let event_loop: *mut ~EventLoop = &mut self_sched.event_loop;
// Our scheduler must be in the task before the event loop
// is started.
@ -793,7 +793,7 @@ pub enum SchedMessage {
}
pub struct SchedHandle {
priv remote: ~RemoteCallbackObject,
priv remote: ~RemoteCallback,
priv queue: MessageQueue<SchedMessage>,
sched_id: uint
}

View File

@ -22,6 +22,7 @@ use super::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
use vec::{OwnedVector, MutableVector, ImmutableVector};
use path::GenericPath;
use rt::sched::Scheduler;
use rt::rtio::EventLoop;
use unstable::{run_in_bare_thread};
use rt::thread::Thread;
use rt::task::Task;
@ -36,7 +37,7 @@ pub fn new_test_uv_sched() -> Scheduler {
let queue = WorkQueue::new();
let queues = ~[queue.clone()];
let mut sched = Scheduler::new(~UvEventLoop::new(),
let mut sched = Scheduler::new(~UvEventLoop::new() as ~EventLoop,
queue,
queues,
SleeperList::new());
@ -195,7 +196,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
}
for i in range(0u, nthreads) {
let loop_ = ~UvEventLoop::new();
let loop_ = ~UvEventLoop::new() as ~EventLoop;
let mut sched = ~Scheduler::new(loop_,
work_queues[i].clone(),
work_queues.clone(),

View File

@ -215,11 +215,11 @@ impl EventLoop for UvEventLoop {
fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
let idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
return ~UvPausibleIdleCallback {
~UvPausibleIdleCallback {
watcher: idle_watcher,
idle_flag: false,
closed: false
};
} as ~PausibleIdleCallback
}
fn callback_ms(&mut self, ms: u64, f: ~fn()) {
@ -231,12 +231,12 @@ impl EventLoop for UvEventLoop {
}
}
fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject {
~UvRemoteCallback::new(self.uvio.uv_loop(), f)
fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallback{
~UvRemoteCallback::new(self.uvio.uv_loop(), f) as ~RemoteCallback
}
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> {
Some(&mut self.uvio)
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> {
Some(&mut self.uvio as &mut IoFactory)
}
}
@ -246,30 +246,30 @@ pub struct UvPausibleIdleCallback {
priv closed: bool
}
impl UvPausibleIdleCallback {
impl RtioPausibleIdleCallback for UvPausibleIdleCallback {
#[inline]
pub fn start(&mut self, f: ~fn()) {
fn start(&mut self, f: ~fn()) {
do self.watcher.start |_idle_watcher, _status| {
f();
};
self.idle_flag = true;
}
#[inline]
pub fn pause(&mut self) {
fn pause(&mut self) {
if self.idle_flag == true {
self.watcher.stop();
self.idle_flag = false;
}
}
#[inline]
pub fn resume(&mut self) {
fn resume(&mut self) {
if self.idle_flag == false {
self.watcher.restart();
self.idle_flag = true;
}
}
#[inline]
pub fn close(&mut self) {
fn close(&mut self) {
self.pause();
if !self.closed {
self.closed = true;
@ -447,11 +447,11 @@ impl IoFactory for UvIoFactory {
// Connect to an address and return a new stream
// NB: This blocks the task waiting on the connection.
// It would probably be better to return a future
fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoError> {
fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStream, IoError> {
// Create a cell in the task to hold the result. We will fill
// the cell before resuming the task.
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
let result_cell_ptr: *Cell<Result<~RtioTcpStream, IoError>> = &result_cell;
// Block this task and take ownership, switch to scheduler context
do task::unkillable { // FIXME(#8674)
@ -467,7 +467,8 @@ impl IoFactory for UvIoFactory {
None => {
let tcp = NativeHandle::from_native_handle(stream.native_handle());
let home = get_handle_to_current_scheduler!();
let res = Ok(~UvTcpStream { watcher: tcp, home: home });
let res = Ok(~UvTcpStream { watcher: tcp, home: home }
as ~RtioTcpStream);
// Store the stream in the task's stack
unsafe { (*result_cell_ptr).put_back(res); }
@ -517,12 +518,12 @@ impl IoFactory for UvIoFactory {
}
}
fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> {
fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocket, IoError> {
let mut watcher = UdpWatcher::new(self.uv_loop());
match watcher.bind(addr) {
Ok(_) => {
let home = get_handle_to_current_scheduler!();
Ok(~UvUdpSocket { watcher: watcher, home: home })
Ok(~UvUdpSocket { watcher: watcher, home: home } as ~RtioUdpSocket)
}
Err(uverr) => {
do task::unkillable { // FIXME(#8674)
@ -540,10 +541,10 @@ impl IoFactory for UvIoFactory {
}
}
fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
fn timer_init(&mut self) -> Result<~RtioTimer, IoError> {
let watcher = TimerWatcher::new(self.uv_loop());
let home = get_handle_to_current_scheduler!();
Ok(~UvTimer::new(watcher, home))
Ok(~UvTimer::new(watcher, home) as ~RtioTimer)
}
fn fs_from_raw_fd(&mut self, fd: c_int, close_on_drop: bool) -> ~RtioFileStream {
@ -750,7 +751,7 @@ impl IoFactory for UvIoFactory {
}
fn spawn(&mut self, config: ProcessConfig)
-> Result<(~RtioProcessObject, ~[Option<~RtioPipeObject>]), IoError>
-> Result<(~RtioProcess, ~[Option<~RtioPipe>]), IoError>
{
// Sadly, we must create the UvProcess before we actually call uv_spawn
// so that the exit_cb can close over it and notify it when the process
@ -792,7 +793,8 @@ impl IoFactory for UvIoFactory {
Ok(io) => {
// Only now do we actually get a handle to this scheduler.
ret.home = Some(get_handle_to_current_scheduler!());
Ok((ret, io))
Ok((ret as ~RtioProcess,
io.move_iter().map(|p| p.map(|p| p as ~RtioPipe)).collect()))
}
Err(uverr) => {
// We still need to close the process handle we created, but
@ -827,12 +829,12 @@ impl IoFactory for UvIoFactory {
}
fn unix_connect<P: PathLike>(&mut self, path: &P) ->
Result<~RtioPipeObject, IoError>
Result<~RtioPipe, IoError>
{
let scheduler: ~Scheduler = Local::take();
let mut pipe = Pipe::new(self.uv_loop(), false);
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<~RtioPipeObject, IoError>> = &result_cell;
let result_cell_ptr: *Cell<Result<~RtioPipe, IoError>> = &result_cell;
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
@ -845,7 +847,7 @@ impl IoFactory for UvIoFactory {
handle as *uvll::uv_pipe_t);
let home = get_handle_to_current_scheduler!();
let pipe = UvUnboundPipe::new(pipe, home);
Ok(~UvPipeStream::new(pipe))
Ok(~UvPipeStream::new(pipe) as ~RtioPipe)
}
Some(e) => { Err(uv_error_to_io_error(e)) }
};
@ -871,13 +873,13 @@ impl IoFactory for UvIoFactory {
}
fn tty_open(&mut self, fd: c_int, readable: bool, close_on_drop: bool)
-> Result<~RtioTTYObject, IoError> {
-> Result<~RtioTTY, IoError> {
match tty::TTY::new(self.uv_loop(), fd, readable) {
Ok(tty) => Ok(~UvTTY {
home: get_handle_to_current_scheduler!(),
tty: tty,
close_on_drop: close_on_drop,
}),
} as ~RtioTTY),
Err(e) => Err(uv_error_to_io_error(e))
}
}
@ -921,7 +923,7 @@ impl RtioSocket for UvTcpListener {
}
impl RtioTcpListener for UvTcpListener {
fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError> {
fn listen(self) -> Result<~RtioTcpAcceptor, IoError> {
do self.home_for_io_consume |self_| {
let acceptor = ~UvTcpAcceptor::new(self_);
let incoming = Cell::new(acceptor.incoming.clone());
@ -935,14 +937,15 @@ impl RtioTcpListener for UvTcpListener {
// first accept call in the callback guarenteed to succeed
server.accept(inc.as_stream());
let home = get_handle_to_current_scheduler!();
Ok(~UvTcpStream { watcher: inc, home: home })
Ok(~UvTcpStream { watcher: inc, home: home }
as ~RtioTcpStream)
}
};
incoming.send(inc);
}
};
match res {
Ok(()) => Ok(acceptor),
Ok(()) => Ok(acceptor as ~RtioTcpAcceptor),
Err(e) => Err(uv_error_to_io_error(e)),
}
}
@ -951,7 +954,7 @@ impl RtioTcpListener for UvTcpListener {
pub struct UvTcpAcceptor {
priv listener: UvTcpListener,
priv incoming: Tube<Result<~RtioTcpStreamObject, IoError>>,
priv incoming: Tube<Result<~RtioTcpStream, IoError>>,
}
impl HomingIO for UvTcpAcceptor {
@ -984,7 +987,7 @@ fn accept_simultaneously(stream: StreamWatcher, a: int) -> Result<(), IoError> {
}
impl RtioTcpAcceptor for UvTcpAcceptor {
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
fn accept(&mut self) -> Result<~RtioTcpStream, IoError> {
do self.home_for_io |self_| {
self_.incoming.recv()
}
@ -1718,7 +1721,7 @@ impl UvUnixListener {
}
impl RtioUnixListener for UvUnixListener {
fn listen(self) -> Result<~RtioUnixAcceptorObject, IoError> {
fn listen(self) -> Result<~RtioUnixAcceptor, IoError> {
do self.home_for_io_consume |self_| {
let acceptor = ~UvUnixAcceptor::new(self_);
let incoming = Cell::new(acceptor.incoming.clone());
@ -1732,14 +1735,14 @@ impl RtioUnixListener for UvUnixListener {
server.accept(inc.as_stream());
let home = get_handle_to_current_scheduler!();
let pipe = UvUnboundPipe::new(inc, home);
Ok(~UvPipeStream::new(pipe))
Ok(~UvPipeStream::new(pipe) as ~RtioPipe)
}
};
incoming.send(inc);
}
};
match res {
Ok(()) => Ok(acceptor),
Ok(()) => Ok(acceptor as ~RtioUnixAcceptor),
Err(e) => Err(uv_error_to_io_error(e)),
}
}
@ -1776,7 +1779,7 @@ impl Drop for UvTTY {
pub struct UvUnixAcceptor {
listener: UvUnixListener,
incoming: Tube<Result<~RtioPipeObject, IoError>>,
incoming: Tube<Result<~RtioPipe, IoError>>,
}
impl HomingIO for UvUnixAcceptor {
@ -1790,7 +1793,7 @@ impl UvUnixAcceptor {
}
impl RtioUnixAcceptor for UvUnixAcceptor {
fn accept(&mut self) -> Result<~RtioPipeObject, IoError> {
fn accept(&mut self) -> Result<~RtioPipe, IoError> {
do self.home_for_io |self_| {
self_.incoming.recv()
}

View File

@ -89,7 +89,7 @@ use unstable::sync::Exclusive;
use rt::in_green_task_context;
use rt::local::Local;
use rt::task::{Task, Sched};
use rt::shouldnt_be_public::{Scheduler, KillHandle, WorkQueue, Thread};
use rt::shouldnt_be_public::{Scheduler, KillHandle, WorkQueue, Thread, EventLoop};
use rt::uv::uvio::UvEventLoop;
#[cfg(test)] use task::default_task_opts;
@ -607,7 +607,7 @@ pub fn spawn_raw(mut opts: TaskOpts, f: ~fn()) {
let work_queue = WorkQueue::new();
// Create a new scheduler to hold the new task
let new_loop = ~UvEventLoop::new();
let new_loop = ~UvEventLoop::new() as ~EventLoop;
let mut new_sched = ~Scheduler::new_special(new_loop,
work_queue,
(*sched).work_queues.clone(),