auto merge of #8631 : anasazi/rust/homing-io, r=brson

libuv handles are tied to the event loop that created them. In order to perform IO, the handle must be on the thread with its home event loop. Thus, when as task wants to do IO it must first go to the IO handle's home event loop and pin itself to the corresponding scheduler while the IO action is in flight. Once the IO action completes, the task is unpinned and either returns to its home scheduler if it is a pinned task, or otherwise stays on the current scheduler.

Making new blocking IO implementations (i.e. files) thread safe is rather simple. Add a home field to the IO handle's struct in uvio and implement the HomingIO trait. Wrap every IO call in the HomingIO.home_for_io method, which will take care of the scheduling.

I'm not sure if this remains thread safe in the presence of asynchronous IO at the libuv level. If we decide to do that, then this set up should be revisited.
This commit is contained in:
bors 2013-08-20 17:12:09 -07:00
commit 0bc1ca4045
8 changed files with 679 additions and 339 deletions

View File

@ -17,7 +17,7 @@ use option::{Option, None, Some};
type Port = u16;
#[deriving(Eq, TotalEq)]
#[deriving(Eq, TotalEq, Clone)]
pub enum IpAddr {
Ipv4Addr(u8, u8, u8, u8),
Ipv6Addr(u16, u16, u16, u16, u16, u16, u16, u16)
@ -62,7 +62,7 @@ impl ToStr for IpAddr {
}
}
#[deriving(Eq, TotalEq)]
#[deriving(Eq, TotalEq, Clone)]
pub struct SocketAddr {
ip: IpAddr,
port: Port,

View File

@ -88,9 +88,7 @@ impl Writer for TcpStream {
fn write(&mut self, buf: &[u8]) {
match (**self).write(buf) {
Ok(_) => (),
Err(ioerr) => {
io_error::cond.raise(ioerr);
}
Err(ioerr) => io_error::cond.raise(ioerr),
}
}
@ -129,9 +127,7 @@ impl TcpListener {
impl Listener<TcpStream> for TcpListener {
fn accept(&mut self) -> Option<TcpStream> {
match (**self).accept() {
Ok(s) => {
Some(TcpStream::new(s))
}
Ok(s) => Some(TcpStream::new(s)),
Err(ioerr) => {
io_error::cond.raise(ioerr);
return None;

View File

@ -41,7 +41,7 @@ impl Timer {
}
impl RtioTimer for Timer {
fn sleep(&self, msecs: u64) {
fn sleep(&mut self, msecs: u64) {
(**self).sleep(msecs);
}
}
@ -50,15 +50,11 @@ impl RtioTimer for Timer {
mod test {
use super::*;
use rt::test::*;
use option::{Some, None};
#[test]
fn test_io_timer_sleep_simple() {
do run_in_newsched_task {
let timer = Timer::new();
match timer {
Some(t) => t.sleep(1),
None => assert!(false)
}
do timer.map_move |mut t| { t.sleep(1) };
}
}
}
}

View File

@ -91,5 +91,5 @@ pub trait RtioUdpSocket : RtioSocket {
}
pub trait RtioTimer {
fn sleep(&self, msecs: u64);
fn sleep(&mut self, msecs: u64);
}

View File

@ -190,9 +190,10 @@ impl StreamWatcher {
extern fn close_cb(handle: *uvll::uv_stream_t) {
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
stream_watcher.get_watcher_data().close_cb.take_unwrap()();
let cb = stream_watcher.get_watcher_data().close_cb.take_unwrap();
stream_watcher.drop_watcher_data();
unsafe { free_handle(handle as *c_void) }
cb();
}
}
}
@ -411,9 +412,10 @@ impl UdpWatcher {
extern fn close_cb(handle: *uvll::uv_udp_t) {
let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
udp_watcher.get_watcher_data().close_cb.take_unwrap()();
let cb = udp_watcher.get_watcher_data().close_cb.take_unwrap();
udp_watcher.drop_watcher_data();
unsafe { free_handle(handle as *c_void) }
cb();
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -172,6 +172,7 @@ fn request_sanity_check() {
}
}
// XXX Event loops ignore SIGPIPE by default.
pub unsafe fn loop_new() -> *c_void {
#[fixed_stack_segment]; #[inline(never)];
@ -287,7 +288,7 @@ pub unsafe fn get_udp_handle_from_send_req(send_req: *uv_udp_send_t) -> *uv_udp_
return rust_uv_get_udp_handle_from_send_req(send_req);
}
pub unsafe fn udp_get_sockname(handle: *uv_udp_t, name: *sockaddr_storage) -> c_int {
pub unsafe fn udp_getsockname(handle: *uv_udp_t, name: *sockaddr_storage) -> c_int {
#[fixed_stack_segment]; #[inline(never)];
return rust_uv_udp_getsockname(handle, name);

View File

@ -13,12 +13,21 @@
#include <malloc.h>
#endif
#ifndef __WIN32__
// for signal
#include <signal.h>
#endif
#include "uv.h"
#include "rust_globals.h"
extern "C" void*
rust_uv_loop_new() {
// XXX libuv doesn't always ignore SIGPIPE even though we don't need it.
#ifndef __WIN32__
signal(SIGPIPE, SIG_IGN);
#endif
return (void*)uv_loop_new();
}