Merge remote-tracking branch 'brson/io' into incoming

This commit is contained in:
Brian Anderson 2013-05-17 17:53:50 -07:00
commit 03a8e59615
50 changed files with 3024 additions and 1177 deletions

View File

@ -205,8 +205,11 @@ mod unicode;
#[path = "num/cmath.rs"]
mod cmath;
mod stackwalk;
// XXX: This shouldn't be pub, and it should be reexported under 'unstable'
// but name resolution doesn't work without it being pub.
#[path = "rt/mod.rs"]
mod rt;
pub mod rt;
// A curious inner-module that's not exported that contains the binding
// 'core' so that macro-expanded references to core::error and such

View File

@ -10,17 +10,16 @@
//! Logging
pub mod rustrt {
use libc;
pub extern {
unsafe fn rust_log_console_on();
unsafe fn rust_log_console_off();
unsafe fn rust_log_str(level: u32,
string: *libc::c_char,
size: libc::size_t);
}
}
use option::*;
use either::*;
use rt;
use rt::logging::{Logger, StdErrLogger};
use io;
use libc;
use repr;
use vec;
use cast;
use str;
/// Turns on logging to stdout globally
pub fn console_on() {
@ -55,8 +54,46 @@ pub fn log_type<T>(level: u32, object: &T) {
let bytes = do io::with_bytes_writer |writer| {
repr::write_repr(writer, object);
};
unsafe {
let len = bytes.len() as libc::size_t;
rustrt::rust_log_str(level, transmute(vec::raw::to_ptr(bytes)), len);
match rt::context() {
rt::OldTaskContext => {
unsafe {
let len = bytes.len() as libc::size_t;
rustrt::rust_log_str(level, cast::transmute(vec::raw::to_ptr(bytes)), len);
}
}
_ => {
// XXX: Bad allocation
let msg = str::from_bytes(bytes);
newsched_log_str(msg);
}
}
}
fn newsched_log_str(msg: ~str) {
unsafe {
match rt::local_services::unsafe_try_borrow_local_services() {
Some(local) => {
// Use the available logger
(*local).logger.log(Left(msg));
}
None => {
// There is no logger anywhere, just write to stderr
let mut logger = StdErrLogger;
logger.log(Left(msg));
}
}
}
}
pub mod rustrt {
use libc;
pub extern {
unsafe fn rust_log_console_on();
unsafe fn rust_log_console_off();
unsafe fn rust_log_str(level: u32,
string: *libc::c_char,
size: libc::size_t);
}
}

View File

@ -30,10 +30,24 @@ macro_rules! rtdebug (
($( $arg:expr),+) => ( $(let _ = $arg)*; )
)
macro_rules! rtassert (
( $arg:expr ) => ( {
if !$arg {
abort!("assertion failed: %s", stringify!($arg));
}
} )
)
macro_rules! abort(
($( $msg:expr),+) => ( {
rtdebug!($($msg),+);
unsafe { ::libc::abort(); }
do_abort();
// NB: This is in a fn to avoid putting the `unsafe` block in a macro,
// which causes spurious 'unnecessary unsafe block' warnings.
fn do_abort() -> ! {
unsafe { ::libc::abort(); }
}
} )
)

View File

@ -147,23 +147,25 @@ pub mod win32 {
/*
Accessing environment variables is not generally threadsafe.
This uses a per-runtime lock to serialize access.
FIXME #4726: It would probably be appropriate to make this a real global
Serialize access through a global lock.
*/
fn with_env_lock<T>(f: &fn() -> T) -> T {
use unstable::global::global_data_clone_create;
use unstable::sync::{Exclusive, exclusive};
struct SharedValue(());
type ValueMutex = Exclusive<SharedValue>;
fn key(_: ValueMutex) { }
use unstable::finally::Finally;
unsafe {
let lock: ValueMutex = global_data_clone_create(key, || {
~exclusive(SharedValue(()))
});
return do (|| {
rust_take_env_lock();
f()
}).finally {
rust_drop_env_lock();
};
}
lock.with_imm(|_| f() )
extern {
#[fast_ffi]
fn rust_take_env_lock();
#[fast_ffi]
fn rust_drop_env_lock();
}
}
@ -749,7 +751,7 @@ pub fn list_dir(p: &Path) -> ~[~str] {
use os::win32::{
as_utf16_p
};
use unstable::exchange_alloc::{malloc_raw, free_raw};
use rt::global_heap::{malloc_raw, free_raw};
#[nolink]
extern {
unsafe fn rust_list_dir_wfd_size() -> libc::size_t;

View File

@ -84,6 +84,7 @@ pub impl Context {
}
extern {
#[rust_stack]
fn swap_registers(out_regs: *mut Registers, in_regs: *Registers);
}
@ -111,9 +112,9 @@ fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, sp:
let sp = align_down(sp);
let sp = mut_offset(sp, -4);
unsafe { *sp = arg as uint; }
unsafe { *sp = arg as uint };
let sp = mut_offset(sp, -1);
unsafe { *sp = 0; } // The final return address
unsafe { *sp = 0 }; // The final return address
regs.esp = sp as u32;
regs.eip = fptr as u32;
@ -195,7 +196,7 @@ fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, sp:
fn align_down(sp: *mut uint) -> *mut uint {
unsafe {
let sp = transmute::<*mut uint, uint>(sp);
let sp: uint = transmute(sp);
let sp = sp & !(16 - 1);
transmute::<uint, *mut uint>(sp)
}

View File

@ -9,7 +9,7 @@
// except according to those terms.
use sys::{TypeDesc, size_of};
use libc::{c_void, size_t};
use libc::{c_void, size_t, uintptr_t};
use c_malloc = libc::malloc;
use c_free = libc::free;
use managed::raw::{BoxHeaderRepr, BoxRepr};
@ -34,7 +34,7 @@ pub unsafe fn malloc(td: *TypeDesc, size: uint) -> *c_void {
box.header.prev = null();
box.header.next = null();
let exchange_count = &mut *rust_get_exchange_count_ptr();
let exchange_count = &mut *exchange_count_ptr();
atomic_xadd(exchange_count, 1);
return transmute(box);
@ -52,7 +52,7 @@ pub unsafe fn malloc_raw(size: uint) -> *c_void {
}
pub unsafe fn free(ptr: *c_void) {
let exchange_count = &mut *rust_get_exchange_count_ptr();
let exchange_count = &mut *exchange_count_ptr();
atomic_xsub(exchange_count, 1);
assert!(ptr.is_not_null());
@ -77,7 +77,11 @@ fn align_to(size: uint, align: uint) -> uint {
(size + align - 1) & !(align - 1)
}
extern {
#[rust_stack]
fn rust_get_exchange_count_ptr() -> *mut int;
fn exchange_count_ptr() -> *mut int {
// XXX: Need mutable globals
unsafe { transmute(&rust_exchange_count) }
}
extern {
static rust_exchange_count: uintptr_t;
}

File diff suppressed because it is too large Load Diff

View File

@ -10,7 +10,7 @@
use prelude::*;
use super::support::PathLike;
use super::{Reader, Writer, Seek, Close};
use super::{Reader, Writer, Seek};
use super::SeekStyle;
/// # XXX
@ -69,10 +69,6 @@ impl Seek for FileStream {
fn seek(&mut self, _pos: i64, _style: SeekStyle) { fail!() }
}
impl Close for FileStream {
fn close(&mut self) { fail!() }
}
#[test]
#[ignore]
fn super_simple_smoke_test_lets_go_read_some_files_and_have_a_good_time() {

50
src/libcore/rt/io/mock.rs Normal file
View File

@ -0,0 +1,50 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use option::{Option, None};
use rt::io::{Reader, Writer};
pub struct MockReader {
read: ~fn(buf: &mut [u8]) -> Option<uint>,
eof: ~fn() -> bool
}
impl MockReader {
pub fn new() -> MockReader {
MockReader {
read: |_| None,
eof: || false
}
}
}
impl Reader for MockReader {
fn read(&mut self, buf: &mut [u8]) -> Option<uint> { (self.read)(buf) }
fn eof(&mut self) -> bool { (self.eof)() }
}
pub struct MockWriter {
write: ~fn(buf: &[u8]),
flush: ~fn()
}
impl MockWriter {
pub fn new() -> MockWriter {
MockWriter {
write: |_| (),
flush: || ()
}
}
}
impl Writer for MockWriter {
fn write(&mut self, buf: &[u8]) { (self.write)(buf) }
fn flush(&mut self) { (self.flush)() }
}

View File

@ -187,7 +187,7 @@ In particular code written to ignore errors and expect conditions to be unhandle
will start passing around null or zero objects when wrapped in a condition handler.
* XXX: How should we use condition handlers that return values?
* XXX: Should EOF raise default conditions when EOF is not an error?
# Issues withi/o scheduler affinity, work stealing, task pinning
@ -238,6 +238,7 @@ Out of scope
* How does I/O relate to the Iterator trait?
* std::base64 filters
* Using conditions is a big unknown since we don't have much experience with them
* Too many uses of OtherIoError
*/
@ -252,13 +253,18 @@ pub use self::stdio::println;
pub use self::file::FileStream;
pub use self::net::ip::IpAddr;
#[cfg(not(stage0))]
pub use self::net::tcp::TcpListener;
#[cfg(not(stage0))]
pub use self::net::tcp::TcpStream;
pub use self::net::udp::UdpStream;
// Some extension traits that all Readers and Writers get.
#[cfg(not(stage0))] // Requires condition! fixes
pub use self::extensions::ReaderUtil;
#[cfg(not(stage0))] // Requires condition! fixes
pub use self::extensions::ReaderByteConversions;
#[cfg(not(stage0))] // Requires condition! fixes
pub use self::extensions::WriterByteConversions;
/// Synchronous, non-blocking file I/O.
@ -266,6 +272,7 @@ pub mod file;
/// Synchronous, non-blocking network I/O.
pub mod net {
#[cfg(not(stage0))]
pub mod tcp;
pub mod udp;
pub mod ip;
@ -291,6 +298,7 @@ pub mod flate;
pub mod comm_adapters;
/// Extension traits
#[cfg(not(stage0))] // Requires condition! fixes
mod extensions;
/// Non-I/O things needed by the I/O module
@ -312,6 +320,12 @@ pub mod native {
}
}
/// Mock implementations for testing
mod mock;
/// The default buffer size for various I/O operations
/// XXX: Not pub
pub static DEFAULT_BUF_SIZE: uint = 1024 * 64;
/// The type passed to I/O condition handlers to indicate error
///
@ -326,12 +340,16 @@ pub struct IoError {
#[deriving(Eq)]
pub enum IoErrorKind {
PreviousIoError,
OtherIoError,
EndOfFile,
FileNotFound,
FilePermission,
PermissionDenied,
ConnectionFailed,
Closed,
OtherIoError,
PreviousIoError
ConnectionRefused,
ConnectionReset,
BrokenPipe
}
// XXX: Can't put doc comments on macros
@ -341,19 +359,36 @@ condition! {
/*pub*/ io_error: super::IoError -> ();
}
// XXX: Can't put doc comments on macros
// Raised by `read` on error
condition! {
// FIXME (#6009): uncomment `pub` after expansion support lands.
/*pub*/ read_error: super::IoError -> ();
}
pub trait Reader {
/// Read bytes, up to the length of `buf` and place them in `buf`.
/// Returns the number of bytes read, or `None` on EOF.
/// Returns the number of bytes read. The number of bytes read my
/// be less than the number requested, even 0. Returns `None` on EOF.
///
/// # Failure
///
/// Raises the `io_error` condition on error, then returns `None`.
/// Raises the `read_error` condition on error. If the condition
/// is handled then no guarantee is made about the number of bytes
/// read and the contents of `buf`. If the condition is handled
/// returns `None` (XXX see below).
///
/// # XXX
///
/// * Should raise_default error on eof?
/// * If the condition is handled it should still return the bytes read,
/// in which case there's no need to return Option - but then you *have*
/// to install a handler to detect eof.
///
/// This doesn't take a `len` argument like the old `read`.
/// Will people often need to slice their vectors to call this
/// and will that be annoying?
/// Is it actually possible for 0 bytes to be read successfully?
fn read(&mut self, buf: &mut [u8]) -> Option<uint>;
/// Return whether the Reader has reached the end of the stream.
@ -383,16 +418,7 @@ pub trait Writer {
fn flush(&mut self);
}
/// I/O types that may be closed
///
/// Any further operations performed on a closed resource will raise
/// on `io_error`
pub trait Close {
/// Close the I/O resource
fn close(&mut self);
}
pub trait Stream: Reader + Writer + Close { }
pub trait Stream: Reader + Writer { }
pub enum SeekStyle {
/// Seek from the beginning of the stream
@ -466,6 +492,21 @@ pub fn standard_error(kind: IoErrorKind) -> IoError {
detail: None
}
}
EndOfFile => {
IoError {
kind: EndOfFile,
desc: "End of file",
detail: None
}
}
_ => fail!()
}
}
pub fn placeholder_error() -> IoError {
IoError {
kind: OtherIoError,
desc: "Placeholder error. You shouldn't be seeing this",
detail: None
}
}

View File

@ -40,10 +40,6 @@ impl Writer for FileDesc {
fn flush(&mut self) { fail!() }
}
impl Close for FileDesc {
fn close(&mut self) { fail!() }
}
impl Seek for FileDesc {
fn tell(&self) -> u64 { fail!() }
@ -72,10 +68,6 @@ impl Writer for CFile {
fn flush(&mut self) { fail!() }
}
impl Close for CFile {
fn close(&mut self) { fail!() }
}
impl Seek for CFile {
fn tell(&self) -> u64 { fail!() }
fn seek(&mut self, _pos: i64, _style: SeekStyle) { fail!() }

View File

@ -8,67 +8,349 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::*;
use super::super::*;
use super::ip::IpAddr;
use option::{Option, Some, None};
use result::{Ok, Err};
use rt::sched::local_sched::unsafe_borrow_io;
use rt::io::net::ip::IpAddr;
use rt::io::{Reader, Writer, Listener};
use rt::io::{io_error, read_error, EndOfFile};
use rt::rtio::{IoFactory,
RtioTcpListener, RtioTcpListenerObject,
RtioTcpStream, RtioTcpStreamObject};
pub struct TcpStream;
pub struct TcpStream {
rtstream: ~RtioTcpStreamObject
}
impl TcpStream {
pub fn connect(_addr: IpAddr) -> Option<TcpStream> {
fail!()
fn new(s: ~RtioTcpStreamObject) -> TcpStream {
TcpStream {
rtstream: s
}
}
pub fn connect(addr: IpAddr) -> Option<TcpStream> {
let stream = unsafe {
rtdebug!("borrowing io to connect");
let io = unsafe_borrow_io();
rtdebug!("about to connect");
(*io).tcp_connect(addr)
};
match stream {
Ok(s) => {
Some(TcpStream::new(s))
}
Err(ioerr) => {
rtdebug!("failed to connect: %?", ioerr);
io_error::cond.raise(ioerr);
return None;
}
}
}
}
impl Reader for TcpStream {
fn read(&mut self, _buf: &mut [u8]) -> Option<uint> { fail!() }
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
let bytes_read = self.rtstream.read(buf);
match bytes_read {
Ok(read) => Some(read),
Err(ioerr) => {
// EOF is indicated by returning None
if ioerr.kind != EndOfFile {
read_error::cond.raise(ioerr);
}
return None;
}
}
}
fn eof(&mut self) -> bool { fail!() }
}
impl Writer for TcpStream {
fn write(&mut self, _buf: &[u8]) { fail!() }
fn write(&mut self, buf: &[u8]) {
let res = self.rtstream.write(buf);
match res {
Ok(_) => (),
Err(ioerr) => {
io_error::cond.raise(ioerr);
}
}
}
fn flush(&mut self) { fail!() }
}
impl Close for TcpStream {
fn close(&mut self) { fail!() }
pub struct TcpListener {
rtlistener: ~RtioTcpListenerObject,
}
pub struct TcpListener;
impl TcpListener {
pub fn bind(_addr: IpAddr) -> Option<TcpListener> {
fail!()
pub fn bind(addr: IpAddr) -> Option<TcpListener> {
let listener = unsafe { (*unsafe_borrow_io()).tcp_bind(addr) };
match listener {
Ok(l) => {
Some(TcpListener {
rtlistener: l
})
}
Err(ioerr) => {
io_error::cond.raise(ioerr);
return None;
}
}
}
}
impl Listener<TcpStream> for TcpListener {
fn accept(&mut self) -> Option<TcpStream> { fail!() }
fn accept(&mut self) -> Option<TcpStream> {
let rtstream = self.rtlistener.accept();
match rtstream {
Ok(s) => {
Some(TcpStream::new(s))
}
Err(ioerr) => {
io_error::cond.raise(ioerr);
return None;
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
use int;
use cell::Cell;
use rt::test::*;
use rt::io::net::ip::Ipv4;
use rt::io::*;
#[test] #[ignore]
fn bind_error() {
do run_in_newsched_task {
let mut called = false;
do io_error::cond.trap(|e| {
assert!(e.kind == PermissionDenied);
called = true;
}).in {
let addr = Ipv4(0, 0, 0, 0, 1);
let listener = TcpListener::bind(addr);
assert!(listener.is_none());
}
assert!(called);
}
}
#[test]
fn connect_error() {
do run_in_newsched_task {
let mut called = false;
do io_error::cond.trap(|e| {
assert!(e.kind == ConnectionRefused);
called = true;
}).in {
let addr = Ipv4(0, 0, 0, 0, 1);
let stream = TcpStream::connect(addr);
assert!(stream.is_none());
}
assert!(called);
}
}
#[test]
fn smoke_test() {
/*do run_in_newsched_task {
do run_in_newsched_task {
let addr = next_test_ip4();
do spawn_immediately {
let listener = TcpListener::bind(addr);
do listener.accept() {
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut buf = [0];
stream.read(buf);
assert!(buf[0] == 99);
}
do spawntask_immediately {
let mut stream = TcpStream::connect(addr);
stream.write([99]);
}
}
}
#[test]
fn read_eof() {
do run_in_newsched_task {
let addr = next_test_ip4();
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut buf = [0];
let nread = stream.read(buf);
assert!(nread.is_none());
}
do spawntask_immediately {
let _stream = TcpStream::connect(addr);
// Close
}
}
}
#[test]
fn read_eof_twice() {
do run_in_newsched_task {
let addr = next_test_ip4();
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut buf = [0];
let nread = stream.read(buf);
assert!(nread.is_none());
let nread = stream.read(buf);
assert!(nread.is_none());
}
do spawntask_immediately {
let _stream = TcpStream::connect(addr);
// Close
}
}
}
#[test]
fn write_close() {
do run_in_newsched_task {
let addr = next_test_ip4();
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let buf = [0];
loop {
let mut stop = false;
do io_error::cond.trap(|e| {
// NB: ECONNRESET on linux, EPIPE on mac
assert!(e.kind == ConnectionReset || e.kind == BrokenPipe);
stop = true;
}).in {
stream.write(buf);
}
if stop { break }
}
}
do spawntask_immediately {
let _stream = TcpStream::connect(addr);
// Close
}
}
}
#[test]
fn multiple_connect_serial() {
do run_in_newsched_task {
let addr = next_test_ip4();
let max = 10;
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
for max.times {
let mut stream = listener.accept();
let mut buf = [0];
listener.read(buf);
stream.read(buf);
assert!(buf[0] == 99);
}
}
do spawn_immediately {
let stream = TcpStream::connect(addr);
stream.write([99]);
do spawntask_immediately {
for max.times {
let mut stream = TcpStream::connect(addr);
stream.write([99]);
}
}
}*/
}
}
#[test]
fn multiple_connect_interleaved_greedy_schedule() {
do run_in_newsched_task {
let addr = next_test_ip4();
static MAX: int = 10;
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
for int::range(0, MAX) |i| {
let stream = Cell(listener.accept());
rtdebug!("accepted");
// Start another task to handle the connection
do spawntask_immediately {
let mut stream = stream.take();
let mut buf = [0];
stream.read(buf);
assert!(buf[0] == i as u8);
rtdebug!("read");
}
}
}
connect(0, addr);
fn connect(i: int, addr: IpAddr) {
if i == MAX { return }
do spawntask_immediately {
rtdebug!("connecting");
let mut stream = TcpStream::connect(addr);
// Connect again before writing
connect(i + 1, addr);
rtdebug!("writing");
stream.write([i as u8]);
}
}
}
}
#[test]
fn multiple_connect_interleaved_lazy_schedule() {
do run_in_newsched_task {
let addr = next_test_ip4();
static MAX: int = 10;
do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
for int::range(0, MAX) |_| {
let stream = Cell(listener.accept());
rtdebug!("accepted");
// Start another task to handle the connection
do spawntask_later {
let mut stream = stream.take();
let mut buf = [0];
stream.read(buf);
assert!(buf[0] == 99);
rtdebug!("read");
}
}
}
connect(0, addr);
fn connect(i: int, addr: IpAddr) {
if i == MAX { return }
do spawntask_later {
rtdebug!("connecting");
let mut stream = TcpStream::connect(addr);
// Connect again before writing
connect(i + 1, addr);
rtdebug!("writing");
stream.write([99]);
}
}
}
}
}

View File

@ -32,10 +32,6 @@ impl Writer for UdpStream {
fn flush(&mut self) { fail!() }
}
impl Close for UdpStream {
fn close(&mut self) { fail!() }
}
pub struct UdpListener;
impl UdpListener {

View File

@ -32,10 +32,6 @@ impl Writer for UnixStream {
fn flush(&mut self) { fail!() }
}
impl Close for UnixStream {
fn close(&mut self) { fail!() }
}
pub struct UnixListener;
impl UnixListener {

View File

@ -18,7 +18,7 @@
use option::*;
use super::{Reader, Writer, Listener};
use super::{standard_error, PreviousIoError, io_error, IoError};
use super::{standard_error, PreviousIoError, io_error, read_error, IoError};
fn prev_io_error() -> IoError {
standard_error(PreviousIoError)
@ -45,7 +45,7 @@ impl<R: Reader> Reader for Option<R> {
match *self {
Some(ref mut reader) => reader.read(buf),
None => {
io_error::cond.raise(prev_io_error());
read_error::cond.raise(prev_io_error());
None
}
}
@ -79,7 +79,7 @@ mod test {
use option::*;
use super::super::mem::*;
use rt::test::*;
use super::super::{PreviousIoError, io_error};
use super::super::{PreviousIoError, io_error, read_error};
#[test]
fn test_option_writer() {
@ -133,7 +133,7 @@ mod test {
let mut buf = [];
let mut called = false;
do io_error::cond.trap(|err| {
do read_error::cond.trap(|err| {
assert!(err.kind == PreviousIoError);
called = true;
}).in {

View File

@ -9,7 +9,7 @@
// except according to those terms.
use prelude::*;
use super::{Reader, Writer, Close};
use super::{Reader, Writer};
pub fn stdin() -> StdReader { fail!() }
@ -39,10 +39,6 @@ impl Reader for StdReader {
fn eof(&mut self) -> bool { fail!() }
}
impl Close for StdReader {
fn close(&mut self) { fail!() }
}
pub struct StdWriter;
impl StdWriter {
@ -55,6 +51,3 @@ impl Writer for StdWriter {
fn flush(&mut self) { fail!() }
}
impl Close for StdWriter {
fn close(&mut self) { fail!() }
}

View File

@ -13,18 +13,21 @@
use prelude::*;
use ptr::mut_null;
use libc::c_void;
use cast::transmute;
use cast;
use cell::Cell;
use super::Scheduler;
use super::super::rtio::IoFactoryObject;
use tls = super::super::thread_local_storage;
#[cfg(test)] use super::super::uvio::UvEventLoop;
use rt::sched::Scheduler;
use rt::rtio::{EventLoop, IoFactoryObject};
use tls = rt::thread_local_storage;
use unstable::finally::Finally;
#[cfg(test)] use rt::uv::uvio::UvEventLoop;
/// Give the Scheduler to thread-local storage
pub fn put(sched: ~Scheduler) {
unsafe {
let key = tls_key();
let void_sched: *mut c_void = transmute::<~Scheduler, *mut c_void>(sched);
let void_sched: *mut c_void = cast::transmute(sched);
tls::set(key, void_sched);
}
}
@ -34,8 +37,8 @@ pub fn take() -> ~Scheduler {
unsafe {
let key = tls_key();
let void_sched: *mut c_void = tls::get(key);
assert!(void_sched.is_not_null());
let sched = transmute::<*mut c_void, ~Scheduler>(void_sched);
rtassert!(void_sched.is_not_null());
let sched: ~Scheduler = cast::transmute(void_sched);
tls::set(key, mut_null());
return sched;
}
@ -55,8 +58,18 @@ pub fn exists() -> bool {
/// While the scheduler is borrowed it is not available in TLS.
pub fn borrow(f: &fn(&mut Scheduler)) {
let mut sched = take();
f(sched);
put(sched);
// XXX: Need a different abstraction from 'finally' here to avoid unsafety
unsafe {
let unsafe_sched = cast::transmute_mut_region(&mut *sched);
let sched = Cell(sched);
do (|| {
f(unsafe_sched);
}).finally {
put(sched.take());
}
}
}
/// Borrow a mutable reference to the thread-local Scheduler
@ -65,33 +78,35 @@ pub fn borrow(f: &fn(&mut Scheduler)) {
///
/// Because this leaves the Scheduler in thread-local storage it is possible
/// For the Scheduler pointer to be aliased
pub unsafe fn unsafe_borrow() -> &mut Scheduler {
pub unsafe fn unsafe_borrow() -> *mut Scheduler {
let key = tls_key();
let mut void_sched: *mut c_void = tls::get(key);
assert!(void_sched.is_not_null());
rtassert!(void_sched.is_not_null());
{
let void_sched_ptr = &mut void_sched;
let sched: &mut ~Scheduler = {
transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr)
};
let sched: &mut Scheduler = &mut **sched;
let sched: *mut *mut c_void = &mut void_sched;
let sched: *mut ~Scheduler = sched as *mut ~Scheduler;
let sched: *mut Scheduler = &mut **sched;
return sched;
}
}
pub unsafe fn unsafe_borrow_io() -> &mut IoFactoryObject {
pub unsafe fn unsafe_borrow_io() -> *mut IoFactoryObject {
let sched = unsafe_borrow();
return sched.event_loop.io().unwrap();
let io: *mut IoFactoryObject = (*sched).event_loop.io().unwrap();
return io;
}
fn tls_key() -> tls::Key {
maybe_tls_key().get()
match maybe_tls_key() {
Some(key) => key,
None => abort!("runtime tls key not initialized")
}
}
fn maybe_tls_key() -> Option<tls::Key> {
unsafe {
let key: *mut c_void = rust_get_sched_tls_key();
let key: &mut tls::Key = transmute(key);
let key: *mut c_void = rust_get_rt_tls_key();
let key: &mut tls::Key = cast::transmute(key);
let key = *key;
// Check that the key has been initialized.
@ -105,7 +120,7 @@ fn maybe_tls_key() -> Option<tls::Key> {
// another thread. I think this is fine since the only action
// they could take if it was initialized would be to check the
// thread-local value and see that it's not set.
if key != 0 {
if key != -1 {
return Some(key);
} else {
return None;
@ -114,7 +129,8 @@ fn maybe_tls_key() -> Option<tls::Key> {
}
extern {
fn rust_get_sched_tls_key() -> *mut c_void;
#[fast_ffi]
fn rust_get_rt_tls_key() -> *mut c_void;
}
#[test]

View File

@ -23,19 +23,19 @@ use libc::{c_void, uintptr_t};
use cast::transmute;
use super::sched::local_sched;
use super::local_heap::LocalHeap;
use rt::logging::StdErrLogger;
pub struct LocalServices {
heap: LocalHeap,
gc: GarbageCollector,
storage: LocalStorage,
logger: Logger,
logger: StdErrLogger,
unwinder: Option<Unwinder>,
destroyed: bool
}
pub struct GarbageCollector;
pub struct LocalStorage(*c_void, Option<~fn(*c_void)>);
pub struct Logger;
pub struct Unwinder {
unwinding: bool,
@ -47,7 +47,7 @@ impl LocalServices {
heap: LocalHeap::new(),
gc: GarbageCollector,
storage: LocalStorage(ptr::null(), None),
logger: Logger,
logger: StdErrLogger,
unwinder: Some(Unwinder { unwinding: false }),
destroyed: false
}
@ -58,7 +58,7 @@ impl LocalServices {
heap: LocalHeap::new(),
gc: GarbageCollector,
storage: LocalStorage(ptr::null(), None),
logger: Logger,
logger: StdErrLogger,
unwinder: None,
destroyed: false
}
@ -169,19 +169,27 @@ pub fn borrow_local_services(f: &fn(&mut LocalServices)) {
}
}
pub unsafe fn unsafe_borrow_local_services() -> &mut LocalServices {
use cast::transmute_mut_region;
match local_sched::unsafe_borrow().current_task {
pub unsafe fn unsafe_borrow_local_services() -> *mut LocalServices {
match (*local_sched::unsafe_borrow()).current_task {
Some(~ref mut task) => {
transmute_mut_region(&mut task.local_services)
let s: *mut LocalServices = &mut task.local_services;
return s;
}
None => {
fail!("no local services for schedulers yet")
// Don't fail. Infinite recursion
abort!("no local services for schedulers yet")
}
}
}
pub unsafe fn unsafe_try_borrow_local_services() -> Option<*mut LocalServices> {
if local_sched::exists() {
Some(unsafe_borrow_local_services())
} else {
None
}
}
#[cfg(test)]
mod test {
use rt::test::*;
@ -229,4 +237,12 @@ mod test {
let _ = r.next();
}
}
#[test]
fn logging() {
do run_in_newsched_task() {
info!("here i am. logging in a newsched task");
}
}
}

68
src/libcore/rt/logging.rs Normal file
View File

@ -0,0 +1,68 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use either::*;
pub trait Logger {
fn log(&mut self, msg: Either<~str, &'static str>);
}
pub struct StdErrLogger;
impl Logger for StdErrLogger {
fn log(&mut self, msg: Either<~str, &'static str>) {
use io::{Writer, WriterUtil};
let s: &str = match msg {
Left(ref s) => {
let s: &str = *s;
s
}
Right(ref s) => {
let s: &str = *s;
s
}
};
let dbg = ::libc::STDERR_FILENO as ::io::fd_t;
dbg.write_str(s);
dbg.write_str("\n");
dbg.flush();
}
}
/// Configure logging by traversing the crate map and setting the
/// per-module global logging flags based on the logging spec
pub fn init(crate_map: *u8) {
use os;
use str;
use ptr;
use option::{Some, None};
use libc::c_char;
let log_spec = os::getenv("RUST_LOG");
match log_spec {
Some(spec) => {
do str::as_c_str(spec) |s| {
unsafe {
rust_update_log_settings(crate_map, s);
}
}
}
None => {
unsafe {
rust_update_log_settings(crate_map, ptr::null());
}
}
}
extern {
fn rust_update_log_settings(crate_map: *u8, settings: *c_char);
}
}

View File

@ -8,70 +8,161 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
/*! The Rust runtime, including the scheduler and I/O interface */
/*! The Rust Runtime, including the task scheduler and I/O
The `rt` module provides the private runtime infrastructure necessary
to support core language features like the exchange and local heap,
the garbage collector, logging, local data and unwinding. It also
implements the default task scheduler and task model. Initialization
routines are provided for setting up runtime resources in common
configurations, including that used by `rustc` when generating
executables.
It is intended that the features provided by `rt` can be factored in a
way such that the core library can be built with different 'profiles'
for different use cases, e.g. excluding the task scheduler. A number
of runtime features though are critical to the functioning of the
language and an implementation must be provided regardless of the
execution environment.
Of foremost importance is the global exchange heap, in the module
`global_heap`. Very little practical Rust code can be written without
access to the global heap. Unlike most of `rt` the global heap is
truly a global resource and generally operates independently of the
rest of the runtime.
All other runtime features are 'local', either thread-local or
task-local. Those critical to the functioning of the language are
defined in the module `local_services`. Local services are those which
are expected to be available to Rust code generally but rely on
thread- or task-local state. These currently include the local heap,
the garbage collector, local storage, logging and the stack unwinder.
Local services are primarily implemented for tasks, but may also
be implemented for use outside of tasks.
The relationship between `rt` and the rest of the core library is
not entirely clear yet and some modules will be moving into or
out of `rt` as development proceeds.
Several modules in `core` are clients of `rt`:
* `core::task` - The user-facing interface to the Rust task model.
* `core::task::local_data` - The interface to local data.
* `core::gc` - The garbage collector.
* `core::unstable::lang` - Miscellaneous lang items, some of which rely on `core::rt`.
* `core::condition` - Uses local data.
* `core::cleanup` - Local heap destruction.
* `core::io` - In the future `core::io` will use an `rt` implementation.
* `core::logging`
* `core::pipes`
* `core::comm`
* `core::stackwalk`
*/
#[doc(hidden)];
use libc::c_char;
use ptr::Ptr;
#[path = "sched/mod.rs"]
/// The global (exchange) heap.
pub mod global_heap;
/// The Scheduler and Coroutine types.
mod sched;
mod rtio;
pub mod uvll;
mod uvio;
#[path = "uv/mod.rs"]
mod uv;
/// Thread-local access to the current Scheduler.
pub mod local_sched;
/// Synchronous I/O.
#[path = "io/mod.rs"]
mod io;
// FIXME #5248: The import in `sched` doesn't resolve unless this is pub!
pub mod thread_local_storage;
mod work_queue;
mod stack;
mod context;
mod thread;
pub mod env;
pub mod io;
/// Thread-local implementations of language-critical runtime features like @.
pub mod local_services;
/// The EventLoop and internal synchronous I/O interface.
mod rtio;
/// libuv and default rtio implementation.
#[path = "uv/mod.rs"]
pub mod uv;
// FIXME #5248: The import in `sched` doesn't resolve unless this is pub!
/// Bindings to pthread/windows thread-local storage.
pub mod thread_local_storage;
/// A parallel work-stealing dequeue.
mod work_queue;
/// Stack segments and caching.
mod stack;
/// CPU context swapping.
mod context;
/// Bindings to system threading libraries.
mod thread;
/// The runtime configuration, read from environment variables
pub mod env;
/// The local, managed heap
mod local_heap;
/// The Logger trait and implementations
pub mod logging;
/// Tools for testing the runtime
#[cfg(test)]
pub mod test;
pub fn start(main: *u8, _argc: int, _argv: **c_char, _crate_map: *u8) -> int {
/// Reference counting
pub mod rc;
use self::sched::{Scheduler, Task};
use self::uvio::UvEventLoop;
use sys::Closure;
use ptr;
use cast;
/// A simple single-threaded channel type for passing buffered data between
/// scheduler and task context
pub mod tube;
/// Set up a default runtime configuration, given compiler-supplied arguments.
///
/// This is invoked by the `start` _language item_ (unstable::lang) to
/// run a Rust executable.
///
/// # Arguments
///
/// * `argc` & `argv` - The argument vector. On Unix this information is used
/// by os::args.
/// * `crate_map` - Runtime information about the executing crate, mostly for logging
///
/// # Return value
///
/// The return value is used as the process return code. 0 on success, 101 on error.
pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int {
use self::sched::{Scheduler, Coroutine};
use self::uv::uvio::UvEventLoop;
init(crate_map);
let loop_ = ~UvEventLoop::new();
let mut sched = ~Scheduler::new(loop_);
let main_task = ~Coroutine::new(&mut sched.stack_pool, main);
let main_task = ~do Task::new(&mut sched.stack_pool) {
unsafe {
// `main` is an `fn() -> ()` that doesn't take an environment
// XXX: Could also call this as an `extern "Rust" fn` once they work
let main = Closure {
code: main as *(),
env: ptr::null(),
};
let mainfn: &fn() = cast::transmute(main);
mainfn();
}
};
sched.task_queue.push_back(main_task);
sched.enqueue_task(main_task);
sched.run();
return 0;
}
/// One-time runtime initialization. Currently all this does is set up logging
/// based on the RUST_LOG environment variable.
pub fn init(crate_map: *u8) {
logging::init(crate_map);
}
/// Possible contexts in which Rust code may be executing.
/// Different runtime services are available depending on context.
/// Mostly used for determining if we're using the new scheduler
/// or the old scheduler.
#[deriving(Eq)]
pub enum RuntimeContext {
// Only the exchange heap is available
@ -84,6 +175,7 @@ pub enum RuntimeContext {
OldTaskContext
}
/// Determine the current RuntimeContext
pub fn context() -> RuntimeContext {
use task::rt::rust_task;
@ -118,26 +210,26 @@ pub fn context() -> RuntimeContext {
#[test]
fn test_context() {
use unstable::run_in_bare_thread;
use self::sched::{local_sched, Task};
use self::uvio::UvEventLoop;
use self::sched::{local_sched, Coroutine};
use rt::uv::uvio::UvEventLoop;
use cell::Cell;
assert!(context() == OldTaskContext);
do run_in_bare_thread {
assert!(context() == GlobalContext);
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~do Task::new(&mut sched.stack_pool) {
let task = ~do Coroutine::new(&mut sched.stack_pool) {
assert!(context() == TaskContext);
let sched = local_sched::take();
do sched.deschedule_running_task_and_then() |task| {
assert!(context() == SchedulerContext);
let task = Cell(task);
do local_sched::borrow |sched| {
sched.task_queue.push_back(task.take());
sched.enqueue_task(task.take());
}
}
};
sched.task_queue.push_back(task);
sched.enqueue_task(task);
sched.run();
}
}

142
src/libcore/rt/rc.rs Normal file
View File

@ -0,0 +1,142 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! An owned, task-local, reference counted type
//!
//! # Safety note
//!
//! XXX There is currently no type-system mechanism for enforcing that
//! reference counted types are both allocated on the exchange heap
//! and also non-sendable
//!
//! This doesn't prevent borrowing multiple aliasable mutable pointers
use ops::Drop;
use clone::Clone;
use libc::c_void;
use cast;
pub struct RC<T> {
p: *c_void // ~(uint, T)
}
impl<T> RC<T> {
pub fn new(val: T) -> RC<T> {
unsafe {
let v = ~(1, val);
let p: *c_void = cast::transmute(v);
RC { p: p }
}
}
fn get_mut_state(&mut self) -> *mut (uint, T) {
unsafe {
let p: &mut ~(uint, T) = cast::transmute(&mut self.p);
let p: *mut (uint, T) = &mut **p;
return p;
}
}
fn get_state(&self) -> *(uint, T) {
unsafe {
let p: &~(uint, T) = cast::transmute(&self.p);
let p: *(uint, T) = &**p;
return p;
}
}
pub fn unsafe_borrow_mut(&mut self) -> *mut T {
unsafe {
match *self.get_mut_state() {
(_, ref mut p) => {
let p: *mut T = p;
return p;
}
}
}
}
pub fn refcount(&self) -> uint {
unsafe {
match *self.get_state() {
(count, _) => count
}
}
}
}
#[unsafe_destructor]
impl<T> Drop for RC<T> {
fn finalize(&self) {
assert!(self.refcount() > 0);
unsafe {
// XXX: Mutable finalizer
let this: &mut RC<T> = cast::transmute_mut(self);
match *this.get_mut_state() {
(ref mut count, _) => {
*count = *count - 1
}
}
if this.refcount() == 0 {
let _: ~(uint, T) = cast::transmute(this.p);
}
}
}
}
impl<T> Clone for RC<T> {
fn clone(&self) -> RC<T> {
unsafe {
// XXX: Mutable clone
let this: &mut RC<T> = cast::transmute_mut(self);
match *this.get_mut_state() {
(ref mut count, _) => {
*count = *count + 1;
}
}
}
RC { p: self.p }
}
}
#[cfg(test)]
mod test {
use super::RC;
#[test]
fn smoke_test() {
unsafe {
let mut v1 = RC::new(100);
assert!(*v1.unsafe_borrow_mut() == 100);
assert!(v1.refcount() == 1);
let mut v2 = v1.clone();
assert!(*v2.unsafe_borrow_mut() == 100);
assert!(v2.refcount() == 2);
*v2.unsafe_borrow_mut() = 200;
assert!(*v2.unsafe_borrow_mut() == 200);
assert!(*v1.unsafe_borrow_mut() == 200);
let v3 = v2.clone();
assert!(v3.refcount() == 3);
{
let _v1 = v1;
let _v2 = v2;
}
assert!(v3.refcount() == 1);
}
}
}

View File

@ -11,32 +11,35 @@
use option::*;
use result::*;
use rt::io::IoError;
use super::io::net::ip::IpAddr;
use rt::uv::uvio;
// XXX: ~object doesn't work currently so these are some placeholder
// types to use instead
pub type EventLoopObject = super::uvio::UvEventLoop;
pub type IoFactoryObject = super::uvio::UvIoFactory;
pub type StreamObject = super::uvio::UvStream;
pub type TcpListenerObject = super::uvio::UvTcpListener;
pub type EventLoopObject = uvio::UvEventLoop;
pub type IoFactoryObject = uvio::UvIoFactory;
pub type RtioTcpStreamObject = uvio::UvTcpStream;
pub type RtioTcpListenerObject = uvio::UvTcpListener;
pub trait EventLoop {
fn run(&mut self);
fn callback(&mut self, ~fn());
fn callback_ms(&mut self, ms: u64, ~fn());
/// The asynchronous I/O services. Not all event loops may provide one
fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject>;
}
pub trait IoFactory {
fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject>;
fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject>;
fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>;
fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>;
}
pub trait TcpListener {
fn listen(&mut self) -> Option<~StreamObject>;
pub trait RtioTcpListener {
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>;
}
pub trait Stream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, ()>;
fn write(&mut self, buf: &[u8]) -> Result<(), ()>;
pub trait RtioTcpStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
}

View File

@ -19,19 +19,15 @@ use super::context::Context;
use super::local_services::LocalServices;
use cell::Cell;
#[cfg(test)] use super::uvio::UvEventLoop;
#[cfg(test)] use unstable::run_in_bare_thread;
#[cfg(test)] use int;
// A more convenient name for external callers, e.g. `local_sched::take()`
pub mod local_sched;
/// The Scheduler is responsible for coordinating execution of Tasks
/// The Scheduler is responsible for coordinating execution of Coroutines
/// on a single thread. When the scheduler is running it is owned by
/// thread local storage and the running task is owned by the
/// scheduler.
pub struct Scheduler {
task_queue: WorkQueue<~Task>,
priv work_queue: WorkQueue<~Coroutine>,
stack_pool: StackPool,
/// The event loop used to drive the scheduler and perform I/O
event_loop: ~EventLoopObject,
@ -39,7 +35,7 @@ pub struct Scheduler {
/// Always valid when a task is executing, otherwise not
priv saved_context: Context,
/// The currently executing task
current_task: Option<~Task>,
current_task: Option<~Coroutine>,
/// An action performed after a context switch on behalf of the
/// code running before the context switch
priv cleanup_job: Option<CleanupJob>
@ -49,17 +45,17 @@ pub struct Scheduler {
// complaining
type UnsafeTaskReceiver = sys::Closure;
trait ClosureConverter {
fn from_fn(&fn(~Task)) -> Self;
fn to_fn(self) -> &fn(~Task);
fn from_fn(&fn(~Coroutine)) -> Self;
fn to_fn(self) -> &fn(~Coroutine);
}
impl ClosureConverter for UnsafeTaskReceiver {
fn from_fn(f: &fn(~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
fn to_fn(self) -> &fn(~Task) { unsafe { transmute(self) } }
fn from_fn(f: &fn(~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } }
fn to_fn(self) -> &fn(~Coroutine) { unsafe { transmute(self) } }
}
enum CleanupJob {
DoNothing,
GiveTask(~Task, UnsafeTaskReceiver)
GiveTask(~Coroutine, UnsafeTaskReceiver)
}
pub impl Scheduler {
@ -76,7 +72,7 @@ pub impl Scheduler {
Scheduler {
event_loop: event_loop,
task_queue: WorkQueue::new(),
work_queue: WorkQueue::new(),
stack_pool: StackPool::new(),
saved_context: Context::empty(),
current_task: None,
@ -91,43 +87,56 @@ pub impl Scheduler {
fn run(~self) -> ~Scheduler {
assert!(!self.in_task_context());
// Give ownership of the scheduler (self) to the thread
local_sched::put(self);
let mut self_sched = self;
unsafe {
let scheduler = local_sched::unsafe_borrow();
fn run_scheduler_once() {
let scheduler = local_sched::take();
if scheduler.resume_task_from_queue() {
// Ok, a task ran. Nice! We'll do it again later
do local_sched::borrow |scheduler| {
scheduler.event_loop.callback(run_scheduler_once);
}
}
}
let event_loop: *mut ~EventLoopObject = {
let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop;
event_loop
};
scheduler.event_loop.callback(run_scheduler_once);
scheduler.event_loop.run();
// Give ownership of the scheduler (self) to the thread
local_sched::put(self_sched);
(*event_loop).run();
}
return local_sched::take();
let sched = local_sched::take();
assert!(sched.work_queue.is_empty());
return sched;
}
/// Schedule a task to be executed later.
///
/// Pushes the task onto the work stealing queue and tells the event loop
/// to run it later. Always use this instead of pushing to the work queue
/// directly.
fn enqueue_task(&mut self, task: ~Coroutine) {
self.work_queue.push_front(task);
self.event_loop.callback(resume_task_from_queue);
fn resume_task_from_queue() {
let scheduler = local_sched::take();
scheduler.resume_task_from_queue();
}
}
// * Scheduler-context operations
fn resume_task_from_queue(~self) -> bool {
fn resume_task_from_queue(~self) {
assert!(!self.in_task_context());
rtdebug!("looking in work queue for task to schedule");
let mut this = self;
match this.task_queue.pop_front() {
match this.work_queue.pop_front() {
Some(task) => {
rtdebug!("resuming task from work queue");
this.resume_task_immediately(task);
return true;
}
None => {
rtdebug!("no tasks in queue");
local_sched::put(this);
return false;
}
}
}
@ -151,20 +160,20 @@ pub impl Scheduler {
abort!("control reached end of task");
}
fn schedule_new_task(~self, task: ~Task) {
fn schedule_new_task(~self, task: ~Coroutine) {
assert!(self.in_task_context());
do self.switch_running_tasks_and_then(task) |last_task| {
let last_task = Cell(last_task);
do local_sched::borrow |sched| {
sched.task_queue.push_front(last_task.take());
sched.enqueue_task(last_task.take());
}
}
}
// Core scheduling ops
fn resume_task_immediately(~self, task: ~Task) {
fn resume_task_immediately(~self, task: ~Coroutine) {
let mut this = self;
assert!(!this.in_task_context());
@ -179,7 +188,7 @@ pub impl Scheduler {
// Take pointers to both the task and scheduler's saved registers.
unsafe {
let sched = local_sched::unsafe_borrow();
let (sched_context, _, next_task_context) = sched.get_contexts();
let (sched_context, _, next_task_context) = (*sched).get_contexts();
let next_task_context = next_task_context.unwrap();
// Context switch to the task, restoring it's registers
// and saving the scheduler's
@ -187,10 +196,10 @@ pub impl Scheduler {
let sched = local_sched::unsafe_borrow();
// The running task should have passed ownership elsewhere
assert!(sched.current_task.is_none());
assert!((*sched).current_task.is_none());
// Running tasks may have asked us to do some cleanup
sched.run_cleanup_job();
(*sched).run_cleanup_job();
}
}
@ -202,40 +211,44 @@ pub impl Scheduler {
/// The closure here is a *stack* closure that lives in the
/// running task. It gets transmuted to the scheduler's lifetime
/// and called while the task is blocked.
fn deschedule_running_task_and_then(~self, f: &fn(~Task)) {
fn deschedule_running_task_and_then(~self, f: &fn(~Coroutine)) {
let mut this = self;
assert!(this.in_task_context());
rtdebug!("blocking task");
let blocked_task = this.current_task.swap_unwrap();
let f_fake_region = unsafe { transmute::<&fn(~Task), &fn(~Task)>(f) };
let f_opaque = ClosureConverter::from_fn(f_fake_region);
this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
unsafe {
let blocked_task = this.current_task.swap_unwrap();
let f_fake_region = transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f);
let f_opaque = ClosureConverter::from_fn(f_fake_region);
this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
}
local_sched::put(this);
let sched = unsafe { local_sched::unsafe_borrow() };
let (sched_context, last_task_context, _) = sched.get_contexts();
let last_task_context = last_task_context.unwrap();
Context::swap(last_task_context, sched_context);
unsafe {
let sched = local_sched::unsafe_borrow();
let (sched_context, last_task_context, _) = (*sched).get_contexts();
let last_task_context = last_task_context.unwrap();
Context::swap(last_task_context, sched_context);
// We could be executing in a different thread now
let sched = unsafe { local_sched::unsafe_borrow() };
sched.run_cleanup_job();
// We could be executing in a different thread now
let sched = local_sched::unsafe_borrow();
(*sched).run_cleanup_job();
}
}
/// Switch directly to another task, without going through the scheduler.
/// You would want to think hard about doing this, e.g. if there are
/// pending I/O events it would be a bad idea.
fn switch_running_tasks_and_then(~self, next_task: ~Task, f: &fn(~Task)) {
fn switch_running_tasks_and_then(~self, next_task: ~Coroutine, f: &fn(~Coroutine)) {
let mut this = self;
assert!(this.in_task_context());
rtdebug!("switching tasks");
let old_running_task = this.current_task.swap_unwrap();
let f_fake_region = unsafe { transmute::<&fn(~Task), &fn(~Task)>(f) };
let f_fake_region = unsafe { transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f) };
let f_opaque = ClosureConverter::from_fn(f_fake_region);
this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque));
this.current_task = Some(next_task);
@ -244,14 +257,14 @@ pub impl Scheduler {
unsafe {
let sched = local_sched::unsafe_borrow();
let (_, last_task_context, next_task_context) = sched.get_contexts();
let (_, last_task_context, next_task_context) = (*sched).get_contexts();
let last_task_context = last_task_context.unwrap();
let next_task_context = next_task_context.unwrap();
Context::swap(last_task_context, next_task_context);
// We could be executing in a different thread now
let sched = local_sched::unsafe_borrow();
sched.run_cleanup_job();
(*sched).run_cleanup_job();
}
}
@ -301,7 +314,7 @@ pub impl Scheduler {
// because borrowck thinks the three patterns are conflicting
// borrows
unsafe {
let last_task = transmute::<Option<&Task>, Option<&mut Task>>(last_task);
let last_task = transmute::<Option<&Coroutine>, Option<&mut Coroutine>>(last_task);
let last_task_context = match last_task {
Some(t) => Some(&mut t.saved_context), None => None
};
@ -316,9 +329,9 @@ pub impl Scheduler {
}
}
static TASK_MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack
static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack
pub struct Task {
pub struct Coroutine {
/// The segment of stack on which the task is currently running or,
/// if the task is blocked, on which the task will resume execution
priv current_stack_segment: StackSegment,
@ -329,19 +342,19 @@ pub struct Task {
local_services: LocalServices
}
pub impl Task {
fn new(stack_pool: &mut StackPool, start: ~fn()) -> Task {
Task::with_local(stack_pool, LocalServices::new(), start)
pub impl Coroutine {
fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine {
Coroutine::with_local(stack_pool, LocalServices::new(), start)
}
fn with_local(stack_pool: &mut StackPool,
local_services: LocalServices,
start: ~fn()) -> Task {
let start = Task::build_start_wrapper(start);
let mut stack = stack_pool.take_segment(TASK_MIN_STACK_SIZE);
start: ~fn()) -> Coroutine {
let start = Coroutine::build_start_wrapper(start);
let mut stack = stack_pool.take_segment(MIN_STACK_SIZE);
// NB: Context holds a pointer to that ~fn
let initial_context = Context::new(start, &mut stack);
return Task {
return Coroutine {
current_stack_segment: stack,
saved_context: initial_context,
local_services: local_services
@ -356,10 +369,10 @@ pub impl Task {
// have asked us to do some cleanup.
unsafe {
let sched = local_sched::unsafe_borrow();
sched.run_cleanup_job();
(*sched).run_cleanup_job();
let sched = local_sched::unsafe_borrow();
let task = sched.current_task.get_mut_ref();
let task = (*sched).current_task.get_mut_ref();
// FIXME #6141: shouldn't neet to put `start()` in another closure
task.local_services.run(||start());
}
@ -373,125 +386,160 @@ pub impl Task {
/// Destroy the task and try to reuse its components
fn recycle(~self, stack_pool: &mut StackPool) {
match self {
~Task {current_stack_segment, _} => {
~Coroutine {current_stack_segment, _} => {
stack_pool.give_segment(current_stack_segment);
}
}
}
}
#[test]
fn test_simple_scheduling() {
do run_in_bare_thread {
let mut task_ran = false;
let task_ran_ptr: *mut bool = &mut task_ran;
#[cfg(test)]
mod test {
use int;
use cell::Cell;
use rt::uv::uvio::UvEventLoop;
use unstable::run_in_bare_thread;
use task::spawn;
use rt::test::*;
use super::*;
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~do Task::new(&mut sched.stack_pool) {
unsafe { *task_ran_ptr = true; }
};
sched.task_queue.push_back(task);
sched.run();
assert!(task_ran);
}
}
#[test]
fn test_simple_scheduling() {
do run_in_bare_thread {
let mut task_ran = false;
let task_ran_ptr: *mut bool = &mut task_ran;
#[test]
fn test_several_tasks() {
do run_in_bare_thread {
let total = 10;
let mut task_count = 0;
let task_count_ptr: *mut int = &mut task_count;
let mut sched = ~UvEventLoop::new_scheduler();
for int::range(0, total) |_| {
let task = ~do Task::new(&mut sched.stack_pool) {
unsafe { *task_count_ptr = *task_count_ptr + 1; }
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~do Coroutine::new(&mut sched.stack_pool) {
unsafe { *task_ran_ptr = true; }
};
sched.task_queue.push_back(task);
sched.enqueue_task(task);
sched.run();
assert!(task_ran);
}
sched.run();
assert!(task_count == total);
}
}
#[test]
fn test_swap_tasks_then() {
do run_in_bare_thread {
let mut count = 0;
let count_ptr: *mut int = &mut count;
#[test]
fn test_several_tasks() {
do run_in_bare_thread {
let total = 10;
let mut task_count = 0;
let task_count_ptr: *mut int = &mut task_count;
let mut sched = ~UvEventLoop::new_scheduler();
let task1 = ~do Task::new(&mut sched.stack_pool) {
unsafe { *count_ptr = *count_ptr + 1; }
let mut sched = local_sched::take();
let task2 = ~do Task::new(&mut sched.stack_pool) {
let mut sched = ~UvEventLoop::new_scheduler();
for int::range(0, total) |_| {
let task = ~do Coroutine::new(&mut sched.stack_pool) {
unsafe { *task_count_ptr = *task_count_ptr + 1; }
};
sched.enqueue_task(task);
}
sched.run();
assert!(task_count == total);
}
}
#[test]
fn test_swap_tasks_then() {
do run_in_bare_thread {
let mut count = 0;
let count_ptr: *mut int = &mut count;
let mut sched = ~UvEventLoop::new_scheduler();
let task1 = ~do Coroutine::new(&mut sched.stack_pool) {
unsafe { *count_ptr = *count_ptr + 1; }
let mut sched = local_sched::take();
let task2 = ~do Coroutine::new(&mut sched.stack_pool) {
unsafe { *count_ptr = *count_ptr + 1; }
};
// Context switch directly to the new task
do sched.switch_running_tasks_and_then(task2) |task1| {
let task1 = Cell(task1);
do local_sched::borrow |sched| {
sched.enqueue_task(task1.take());
}
}
unsafe { *count_ptr = *count_ptr + 1; }
};
// Context switch directly to the new task
do sched.switch_running_tasks_and_then(task2) |task1| {
let task1 = Cell(task1);
do local_sched::borrow |sched| {
sched.task_queue.push_front(task1.take());
}
}
unsafe { *count_ptr = *count_ptr + 1; }
};
sched.task_queue.push_back(task1);
sched.run();
assert!(count == 3);
sched.enqueue_task(task1);
sched.run();
assert!(count == 3);
}
}
}
#[bench] #[test] #[ignore(reason = "long test")]
fn test_run_a_lot_of_tasks_queued() {
do run_in_bare_thread {
static MAX: int = 1000000;
let mut count = 0;
let count_ptr: *mut int = &mut count;
#[bench] #[test] #[ignore(reason = "long test")]
fn test_run_a_lot_of_tasks_queued() {
do run_in_bare_thread {
static MAX: int = 1000000;
let mut count = 0;
let count_ptr: *mut int = &mut count;
let mut sched = ~UvEventLoop::new_scheduler();
let mut sched = ~UvEventLoop::new_scheduler();
let start_task = ~do Task::new(&mut sched.stack_pool) {
run_task(count_ptr);
};
sched.task_queue.push_back(start_task);
sched.run();
let start_task = ~do Coroutine::new(&mut sched.stack_pool) {
run_task(count_ptr);
};
sched.enqueue_task(start_task);
sched.run();
assert!(count == MAX);
assert!(count == MAX);
fn run_task(count_ptr: *mut int) {
do local_sched::borrow |sched| {
let task = ~do Task::new(&mut sched.stack_pool) {
unsafe {
*count_ptr = *count_ptr + 1;
if *count_ptr != MAX {
run_task(count_ptr);
fn run_task(count_ptr: *mut int) {
do local_sched::borrow |sched| {
let task = ~do Coroutine::new(&mut sched.stack_pool) {
unsafe {
*count_ptr = *count_ptr + 1;
if *count_ptr != MAX {
run_task(count_ptr);
}
}
}
};
sched.task_queue.push_back(task);
}
};
};
sched.enqueue_task(task);
}
};
}
}
}
#[test]
fn test_block_task() {
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~do Task::new(&mut sched.stack_pool) {
let sched = local_sched::take();
assert!(sched.in_task_context());
do sched.deschedule_running_task_and_then() |task| {
let task = Cell(task);
do local_sched::borrow |sched| {
assert!(!sched.in_task_context());
sched.task_queue.push_back(task.take());
#[test]
fn test_block_task() {
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~do Coroutine::new(&mut sched.stack_pool) {
let sched = local_sched::take();
assert!(sched.in_task_context());
do sched.deschedule_running_task_and_then() |task| {
let task = Cell(task);
do local_sched::borrow |sched| {
assert!(!sched.in_task_context());
sched.enqueue_task(task.take());
}
}
};
sched.enqueue_task(task);
sched.run();
}
}
#[test]
fn test_io_callback() {
// This is a regression test that when there are no schedulable tasks
// in the work queue, but we are performing I/O, that once we do put
// something in the work queue again the scheduler picks it up and doesn't
// exit before emptying the work queue
do run_in_newsched_task {
do spawn {
let sched = local_sched::take();
do sched.deschedule_running_task_and_then |task| {
let mut sched = local_sched::take();
let task = Cell(task);
do sched.event_loop.callback_ms(10) {
rtdebug!("in callback");
let mut sched = local_sched::take();
sched.enqueue_task(task.take());
local_sched::put(sched);
}
local_sched::put(sched);
}
}
};
sched.task_queue.push_back(task);
sched.run();
}
}
}

View File

@ -11,21 +11,36 @@
use container::Container;
use ptr::Ptr;
use vec;
use ops::Drop;
use libc::{c_uint, uintptr_t};
pub struct StackSegment {
buf: ~[u8]
buf: ~[u8],
valgrind_id: c_uint
}
pub impl StackSegment {
fn new(size: uint) -> StackSegment {
// Crate a block of uninitialized values
let mut stack = vec::with_capacity(size);
unsafe {
// Crate a block of uninitialized values
let mut stack = vec::with_capacity(size);
vec::raw::set_len(&mut stack, size);
}
StackSegment {
buf: stack
let mut stk = StackSegment {
buf: stack,
valgrind_id: 0
};
// XXX: Using the FFI to call a C macro. Slow
stk.valgrind_id = rust_valgrind_stack_register(stk.start(), stk.end());
return stk;
}
}
/// Point to the low end of the allocated stack
fn start(&self) -> *uint {
unsafe {
vec::raw::to_ptr(self.buf) as *uint
}
}
@ -35,6 +50,15 @@ pub impl StackSegment {
}
}
impl Drop for StackSegment {
fn finalize(&self) {
unsafe {
// XXX: Using the FFI to call a C macro. Slow
rust_valgrind_stack_deregister(self.valgrind_id);
}
}
}
pub struct StackPool(());
impl StackPool {
@ -47,3 +71,8 @@ impl StackPool {
fn give_segment(&self, _stack: StackSegment) {
}
}
extern {
fn rust_valgrind_stack_register(start: *uintptr_t, end: *uintptr_t) -> c_uint;
fn rust_valgrind_stack_deregister(id: c_uint);
}

View File

@ -18,17 +18,17 @@ use rt::local_services::LocalServices;
/// will abort the process.
pub fn run_in_newsched_task(f: ~fn()) {
use unstable::run_in_bare_thread;
use super::sched::Task;
use super::uvio::UvEventLoop;
use super::sched::Coroutine;
use rt::uv::uvio::UvEventLoop;
let f = Cell(f);
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~Task::with_local(&mut sched.stack_pool,
LocalServices::without_unwinding(),
f.take());
sched.task_queue.push_back(task);
let task = ~Coroutine::with_local(&mut sched.stack_pool,
LocalServices::without_unwinding(),
f.take());
sched.enqueue_task(task);
sched.run();
}
}
@ -38,9 +38,9 @@ pub fn spawntask(f: ~fn()) {
use super::sched::*;
let mut sched = local_sched::take();
let task = ~Task::with_local(&mut sched.stack_pool,
LocalServices::without_unwinding(),
f);
let task = ~Coroutine::with_local(&mut sched.stack_pool,
LocalServices::without_unwinding(),
f);
do sched.switch_running_tasks_and_then(task) |task| {
let task = Cell(task);
let sched = local_sched::take();
@ -53,17 +53,57 @@ pub fn spawntask_immediately(f: ~fn()) {
use super::sched::*;
let mut sched = local_sched::take();
let task = ~Task::with_local(&mut sched.stack_pool,
LocalServices::without_unwinding(),
f);
let task = ~Coroutine::with_local(&mut sched.stack_pool,
LocalServices::without_unwinding(),
f);
do sched.switch_running_tasks_and_then(task) |task| {
let task = Cell(task);
do local_sched::borrow |sched| {
sched.task_queue.push_front(task.take());
sched.enqueue_task(task.take());
}
}
}
/// Create a new task and run it right now. Aborts on failure
pub fn spawntask_later(f: ~fn()) {
use super::sched::*;
let mut sched = local_sched::take();
let task = ~Coroutine::with_local(&mut sched.stack_pool,
LocalServices::without_unwinding(),
f);
sched.enqueue_task(task);
local_sched::put(sched);
}
/// Spawn a task and either run it immediately or run it later
pub fn spawntask_random(f: ~fn()) {
use super::sched::*;
use rand::{Rand, rng};
let mut rng = rng();
let run_now: bool = Rand::rand(&mut rng);
let mut sched = local_sched::take();
let task = ~Coroutine::with_local(&mut sched.stack_pool,
LocalServices::without_unwinding(),
f);
if run_now {
do sched.switch_running_tasks_and_then(task) |task| {
let task = Cell(task);
do local_sched::borrow |sched| {
sched.enqueue_task(task.take());
}
}
} else {
sched.enqueue_task(task);
local_sched::put(sched);
}
}
/// Spawn a task and wait for it to finish, returning whether it completed successfully or failed
pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
use cell::Cell;
@ -82,7 +122,7 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
let old_task = Cell(old_task);
let f = f.take();
let mut sched = local_sched::take();
let new_task = ~do Task::new(&mut sched.stack_pool) {
let new_task = ~do Coroutine::new(&mut sched.stack_pool) {
do (|| {
(f.take())()
}).finally {
@ -92,7 +132,7 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
do sched.switch_running_tasks_and_then(old_task.take()) |new_task| {
let new_task = Cell(new_task);
do local_sched::borrow |sched| {
sched.task_queue.push_front(new_task.take());
sched.enqueue_task(new_task.take());
}
}
}

View File

@ -46,8 +46,11 @@ type pthread_key_t = ::libc::c_uint;
#[cfg(unix)]
extern {
#[fast_ffi]
fn pthread_key_create(key: *mut pthread_key_t, dtor: *u8) -> c_int;
#[fast_ffi]
fn pthread_setspecific(key: pthread_key_t, value: *mut c_void) -> c_int;
#[fast_ffi]
fn pthread_getspecific(key: pthread_key_t) -> *mut c_void;
}

184
src/libcore/rt/tube.rs Normal file
View File

@ -0,0 +1,184 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! A very simple unsynchronized channel type for sending buffered data from
//! scheduler context to task context.
//!
//! XXX: This would be safer to use if split into two types like Port/Chan
use option::*;
use clone::Clone;
use super::rc::RC;
use rt::sched::Coroutine;
use rt::{context, TaskContext, SchedulerContext};
use rt::local_sched;
use vec::OwnedVector;
use container::Container;
struct TubeState<T> {
blocked_task: Option<~Coroutine>,
buf: ~[T]
}
pub struct Tube<T> {
p: RC<TubeState<T>>
}
impl<T> Tube<T> {
pub fn new() -> Tube<T> {
Tube {
p: RC::new(TubeState {
blocked_task: None,
buf: ~[]
})
}
}
pub fn send(&mut self, val: T) {
rtdebug!("tube send");
assert!(context() == SchedulerContext);
unsafe {
let state = self.p.unsafe_borrow_mut();
(*state).buf.push(val);
if (*state).blocked_task.is_some() {
// There's a waiting task. Wake it up
rtdebug!("waking blocked tube");
let task = (*state).blocked_task.swap_unwrap();
let sched = local_sched::take();
sched.resume_task_immediately(task);
}
}
}
pub fn recv(&mut self) -> T {
assert!(context() == TaskContext);
unsafe {
let state = self.p.unsafe_borrow_mut();
if !(*state).buf.is_empty() {
return (*state).buf.shift();
} else {
// Block and wait for the next message
rtdebug!("blocking on tube recv");
assert!(self.p.refcount() > 1); // There better be somebody to wake us up
assert!((*state).blocked_task.is_none());
let sched = local_sched::take();
do sched.deschedule_running_task_and_then |task| {
(*state).blocked_task = Some(task);
}
rtdebug!("waking after tube recv");
let buf = &mut (*state).buf;
assert!(!buf.is_empty());
return buf.shift();
}
}
}
}
impl<T> Clone for Tube<T> {
fn clone(&self) -> Tube<T> {
Tube { p: self.p.clone() }
}
}
#[cfg(test)]
mod test {
use int;
use cell::Cell;
use rt::local_sched;
use rt::test::*;
use rt::rtio::EventLoop;
use super::*;
#[test]
fn simple_test() {
do run_in_newsched_task {
let mut tube: Tube<int> = Tube::new();
let tube_clone = tube.clone();
let tube_clone_cell = Cell(tube_clone);
let sched = local_sched::take();
do sched.deschedule_running_task_and_then |task| {
let mut tube_clone = tube_clone_cell.take();
tube_clone.send(1);
let sched = local_sched::take();
sched.resume_task_immediately(task);
}
assert!(tube.recv() == 1);
}
}
#[test]
fn blocking_test() {
do run_in_newsched_task {
let mut tube: Tube<int> = Tube::new();
let tube_clone = tube.clone();
let tube_clone = Cell(Cell(Cell(tube_clone)));
let sched = local_sched::take();
do sched.deschedule_running_task_and_then |task| {
let tube_clone = tube_clone.take();
do local_sched::borrow |sched| {
let tube_clone = tube_clone.take();
do sched.event_loop.callback {
let mut tube_clone = tube_clone.take();
// The task should be blocked on this now and
// sending will wake it up.
tube_clone.send(1);
}
}
let sched = local_sched::take();
sched.resume_task_immediately(task);
}
assert!(tube.recv() == 1);
}
}
#[test]
fn many_blocking_test() {
static MAX: int = 100;
do run_in_newsched_task {
let mut tube: Tube<int> = Tube::new();
let tube_clone = tube.clone();
let tube_clone = Cell(tube_clone);
let sched = local_sched::take();
do sched.deschedule_running_task_and_then |task| {
callback_send(tube_clone.take(), 0);
fn callback_send(tube: Tube<int>, i: int) {
if i == 100 { return; }
let tube = Cell(Cell(tube));
do local_sched::borrow |sched| {
let tube = tube.take();
do sched.event_loop.callback {
let mut tube = tube.take();
// The task should be blocked on this now and
// sending will wake it up.
tube.send(i);
callback_send(tube, i + 1);
}
}
}
let sched = local_sched::take();
sched.resume_task_immediately(task);
}
for int::range(0, MAX) |i| {
let j = tube.recv();
assert!(j == i);
}
}
}
}

View File

@ -11,15 +11,11 @@
use prelude::*;
use ptr::null;
use libc::c_void;
use super::{UvError, Callback, Request, NativeHandle, Loop};
use super::super::uvll;
use super::super::uvll::*;
pub type FsCallback = ~fn(FsRequest, Option<UvError>);
impl Callback for FsCallback { }
use rt::uv::{Request, NativeHandle, Loop, FsCallback};
use rt::uv::uvll;
use rt::uv::uvll::*;
pub struct FsRequest(*uvll::uv_fs_t);
impl Request for FsRequest;
impl FsRequest {

91
src/libcore/rt/uv/idle.rs Normal file
View File

@ -0,0 +1,91 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use libc::c_int;
use option::Some;
use rt::uv::uvll;
use rt::uv::{Watcher, Loop, NativeHandle, IdleCallback, NullCallback};
use rt::uv::status_to_maybe_uv_error;
pub struct IdleWatcher(*uvll::uv_idle_t);
impl Watcher for IdleWatcher { }
pub impl IdleWatcher {
fn new(loop_: &mut Loop) -> IdleWatcher {
unsafe {
let handle = uvll::idle_new();
assert!(handle.is_not_null());
assert!(0 == uvll::idle_init(loop_.native_handle(), handle));
let mut watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
watcher.install_watcher_data();
return watcher
}
}
fn start(&mut self, cb: IdleCallback) {
{
let data = self.get_watcher_data();
data.idle_cb = Some(cb);
}
unsafe {
assert!(0 == uvll::idle_start(self.native_handle(), idle_cb))
};
extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) {
let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
let data = idle_watcher.get_watcher_data();
let cb: &IdleCallback = data.idle_cb.get_ref();
let status = status_to_maybe_uv_error(handle, status);
(*cb)(idle_watcher, status);
}
}
fn stop(&mut self) {
// NB: Not resetting the Rust idle_cb to None here because `stop` is likely
// called from *within* the idle callback, causing a use after free
unsafe {
assert!(0 == uvll::idle_stop(self.native_handle()));
}
}
fn close(self, cb: NullCallback) {
{
let mut this = self;
let data = this.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe { uvll::close(self.native_handle(), close_cb) };
extern fn close_cb(handle: *uvll::uv_idle_t) {
unsafe {
let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
{
let data = idle_watcher.get_watcher_data();
data.close_cb.swap_unwrap()();
}
idle_watcher.drop_watcher_data();
uvll::idle_delete(handle);
}
}
}
}
impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher {
fn from_native_handle(handle: *uvll::uv_idle_t) -> IdleWatcher {
IdleWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_idle_t {
match self { &IdleWatcher(ptr) => ptr }
}
}

View File

@ -10,7 +10,7 @@
/*!
Bindings to libuv.
Bindings to libuv, along with the default implementation of `core::rt::rtio`.
UV types consist of the event loop (Loop), Watchers, Requests and
Callbacks.
@ -38,29 +38,46 @@ use container::Container;
use option::*;
use str::raw::from_c_str;
use to_str::ToStr;
use ptr::Ptr;
use libc;
use vec;
use ptr;
use ptr::Ptr;
use cast;
use str;
use option::*;
use str::raw::from_c_str;
use to_str::ToStr;
use libc::{c_void, c_int, size_t, malloc, free};
use cast::transmute;
use ptr::null;
use super::uvll;
use unstable::finally::Finally;
use rt::io::IoError;
#[cfg(test)] use unstable::run_in_bare_thread;
pub use self::file::{FsRequest, FsCallback};
pub use self::file::FsRequest;
pub use self::net::{StreamWatcher, TcpWatcher};
pub use self::net::{ReadCallback, AllocCallback, ConnectionCallback, ConnectCallback};
pub use self::idle::IdleWatcher;
pub use self::timer::TimerWatcher;
/// The implementation of `rtio` for libuv
pub mod uvio;
/// C bindings to libuv
pub mod uvll;
pub mod file;
pub mod net;
pub mod idle;
pub mod timer;
/// A trait for callbacks to implement. Provides a little extra type safety
/// for generic, unsafe interop functions like `set_watcher_callback`.
pub trait Callback { }
pub trait Request { }
/// XXX: Loop(*handle) is buggy with destructors. Normal structs
/// with dtors may not be destructured, but tuple structs can,
/// but the results are not correct.
pub struct Loop {
handle: *uvll::uv_loop_t
}
/// The trait implemented by uv 'watchers' (handles). Watchers are
/// non-owning wrappers around the uv handles and are not completely
@ -68,12 +85,9 @@ pub trait Request { }
/// handle. Watchers are generally created, then `start`ed, `stop`ed
/// and `close`ed, but due to their complex life cycle may not be
/// entirely memory safe if used in unanticipated patterns.
pub trait Watcher {
fn event_loop(&self) -> Loop;
}
pub trait Watcher { }
pub type NullCallback = ~fn();
impl Callback for NullCallback { }
pub trait Request { }
/// A type that wraps a native handle
pub trait NativeHandle<T> {
@ -81,13 +95,6 @@ pub trait NativeHandle<T> {
pub fn native_handle(&self) -> T;
}
/// XXX: Loop(*handle) is buggy with destructors. Normal structs
/// with dtors may not be destructured, but tuple structs can,
/// but the results are not correct.
pub struct Loop {
handle: *uvll::uv_loop_t
}
pub impl Loop {
fn new() -> Loop {
let handle = unsafe { uvll::loop_new() };
@ -113,64 +120,74 @@ impl NativeHandle<*uvll::uv_loop_t> for Loop {
}
}
pub struct IdleWatcher(*uvll::uv_idle_t);
impl Watcher for IdleWatcher {
fn event_loop(&self) -> Loop {
loop_from_watcher(self)
}
}
// XXX: The uv alloc callback also has a *uv_handle_t arg
pub type AllocCallback = ~fn(uint) -> Buf;
pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option<UvError>);
pub type NullCallback = ~fn();
pub type IdleCallback = ~fn(IdleWatcher, Option<UvError>);
impl Callback for IdleCallback { }
pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
pub type FsCallback = ~fn(FsRequest, Option<UvError>);
pub type TimerCallback = ~fn(TimerWatcher, Option<UvError>);
pub impl IdleWatcher {
fn new(loop_: &mut Loop) -> IdleWatcher {
unsafe {
let handle = uvll::idle_new();
assert!(handle.is_not_null());
assert!(0 == uvll::idle_init(loop_.native_handle(), handle));
uvll::set_data_for_uv_handle(handle, null::<()>());
NativeHandle::from_native_handle(handle)
}
}
fn start(&mut self, cb: IdleCallback) {
set_watcher_callback(self, cb);
unsafe {
assert!(0 == uvll::idle_start(self.native_handle(), idle_cb))
};
extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) {
let idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
let cb: &IdleCallback = borrow_callback_from_watcher(&idle_watcher);
let status = status_to_maybe_uv_error(handle, status);
(*cb)(idle_watcher, status);
}
}
fn stop(&mut self) {
unsafe { assert!(0 == uvll::idle_stop(self.native_handle())); }
}
fn close(self) {
unsafe { uvll::close(self.native_handle(), close_cb) };
extern fn close_cb(handle: *uvll::uv_idle_t) {
let mut idle_watcher = NativeHandle::from_native_handle(handle);
drop_watcher_callback::<uvll::uv_idle_t, IdleWatcher, IdleCallback>(&mut idle_watcher);
unsafe { uvll::idle_delete(handle) };
}
}
/// Callbacks used by StreamWatchers, set as custom data on the foreign handle
struct WatcherData {
read_cb: Option<ReadCallback>,
write_cb: Option<ConnectionCallback>,
connect_cb: Option<ConnectionCallback>,
close_cb: Option<NullCallback>,
alloc_cb: Option<AllocCallback>,
idle_cb: Option<IdleCallback>,
timer_cb: Option<TimerCallback>
}
impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher {
fn from_native_handle(handle: *uvll::uv_idle_t) -> IdleWatcher {
IdleWatcher(handle)
pub trait WatcherInterop {
fn event_loop(&self) -> Loop;
fn install_watcher_data(&mut self);
fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData;
fn drop_watcher_data(&mut self);
}
impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
/// Get the uv event loop from a Watcher
pub fn event_loop(&self) -> Loop {
unsafe {
let handle = self.native_handle();
let loop_ = uvll::get_loop_for_uv_handle(handle);
NativeHandle::from_native_handle(loop_)
}
}
fn native_handle(&self) -> *uvll::uv_idle_t {
match self { &IdleWatcher(ptr) => ptr }
pub fn install_watcher_data(&mut self) {
unsafe {
let data = ~WatcherData {
read_cb: None,
write_cb: None,
connect_cb: None,
close_cb: None,
alloc_cb: None,
idle_cb: None,
timer_cb: None
};
let data = transmute::<~WatcherData, *c_void>(data);
uvll::set_data_for_uv_handle(self.native_handle(), data);
}
}
pub fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData {
unsafe {
let data = uvll::get_data_for_uv_handle(self.native_handle());
let data = transmute::<&*c_void, &mut ~WatcherData>(&data);
return &mut **data;
}
}
pub fn drop_watcher_data(&mut self) {
unsafe {
let data = uvll::get_data_for_uv_handle(self.native_handle());
let _data = transmute::<*c_void, ~WatcherData>(data);
uvll::set_data_for_uv_handle(self.native_handle(), null::<()>());
}
}
}
@ -198,6 +215,10 @@ pub impl UvError {
from_c_str(desc_str)
}
}
fn is_eof(&self) -> bool {
self.code == uvll::EOF
}
}
impl ToStr for UvError {
@ -213,6 +234,59 @@ fn error_smoke_test() {
assert!(err.to_str() == ~"EOF: end of file");
}
pub fn last_uv_error<H, W: Watcher + NativeHandle<*H>>(watcher: &W) -> UvError {
unsafe {
let loop_ = watcher.event_loop();
UvError(uvll::last_error(loop_.native_handle()))
}
}
pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
// XXX: Could go in str::raw
unsafe fn c_str_to_static_slice(s: *libc::c_char) -> &'static str {
let s = s as *u8;
let mut curr = s, len = 0u;
while *curr != 0u8 {
len += 1u;
curr = ptr::offset(s, len);
}
str::raw::buf_as_slice(s, len, |d| cast::transmute(d))
}
unsafe {
// Importing error constants
use rt::uv::uvll::*;
use rt::io::*;
// uv error descriptions are static
let c_desc = uvll::strerror(&*uverr);
let desc = c_str_to_static_slice(c_desc);
let kind = match uverr.code {
UNKNOWN => OtherIoError,
OK => OtherIoError,
EOF => EndOfFile,
EACCES => PermissionDenied,
ECONNREFUSED => ConnectionRefused,
ECONNRESET => ConnectionReset,
EPIPE => BrokenPipe,
e => {
rtdebug!("e %u", e as uint);
// XXX: Need to map remaining uv error types
OtherIoError
}
};
IoError {
kind: kind,
desc: desc,
detail: None
}
}
}
/// Given a uv handle, convert a callback status to a UvError
// XXX: Follow the pattern below by parameterizing over T: Watcher, not T
@ -230,133 +304,6 @@ pub fn status_to_maybe_uv_error<T>(handle: *T, status: c_int) -> Option<UvError>
}
}
/// Get the uv event loop from a Watcher
pub fn loop_from_watcher<H, W: Watcher + NativeHandle<*H>>(
watcher: &W) -> Loop {
let handle = watcher.native_handle();
let loop_ = unsafe { uvll::get_loop_for_uv_handle(handle) };
NativeHandle::from_native_handle(loop_)
}
/// Set the custom data on a handle to a callback Note: This is only
/// suitable for watchers that make just one type of callback. For
/// others use WatcherData
pub fn set_watcher_callback<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
watcher: &mut W, cb: CB) {
drop_watcher_callback::<H, W, CB>(watcher);
// XXX: Boxing the callback so it fits into a
// pointer. Unfortunate extra allocation
let boxed_cb = ~cb;
let data = unsafe { transmute::<~CB, *c_void>(boxed_cb) };
unsafe { uvll::set_data_for_uv_handle(watcher.native_handle(), data) };
}
/// Delete a callback from a handle's custom data
pub fn drop_watcher_callback<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
watcher: &mut W) {
unsafe {
let handle = watcher.native_handle();
let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
if handle_data.is_not_null() {
// Take ownership of the callback and drop it
let _cb = transmute::<*c_void, ~CB>(handle_data);
// Make sure the pointer is zeroed
uvll::set_data_for_uv_handle(watcher.native_handle(), null::<()>());
}
}
}
/// Take a pointer to the callback installed as custom data
pub fn borrow_callback_from_watcher<H, W: Watcher + NativeHandle<*H>,
CB: Callback>(watcher: &W) -> &CB {
unsafe {
let handle = watcher.native_handle();
let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
assert!(handle_data.is_not_null());
let cb = transmute::<&*c_void, &~CB>(&handle_data);
return &**cb;
}
}
/// Take ownership of the callback installed as custom data
pub fn take_callback_from_watcher<H, W: Watcher + NativeHandle<*H>, CB: Callback>(
watcher: &mut W) -> CB {
unsafe {
let handle = watcher.native_handle();
let handle_data: *c_void = uvll::get_data_for_uv_handle(handle);
assert!(handle_data.is_not_null());
uvll::set_data_for_uv_handle(handle, null::<()>());
let cb: ~CB = transmute::<*c_void, ~CB>(handle_data);
let cb = match cb { ~cb => cb };
return cb;
}
}
/// Callbacks used by StreamWatchers, set as custom data on the foreign handle
struct WatcherData {
read_cb: Option<ReadCallback>,
write_cb: Option<ConnectionCallback>,
connect_cb: Option<ConnectionCallback>,
close_cb: Option<NullCallback>,
alloc_cb: Option<AllocCallback>,
buf: Option<Buf>
}
pub fn install_watcher_data<H, W: Watcher + NativeHandle<*H>>(watcher: &mut W) {
unsafe {
let data = ~WatcherData {
read_cb: None,
write_cb: None,
connect_cb: None,
close_cb: None,
alloc_cb: None,
buf: None
};
let data = transmute::<~WatcherData, *c_void>(data);
uvll::set_data_for_uv_handle(watcher.native_handle(), data);
}
}
pub fn get_watcher_data<'r, H, W: Watcher + NativeHandle<*H>>(
watcher: &'r mut W) -> &'r mut WatcherData {
unsafe {
let data = uvll::get_data_for_uv_handle(watcher.native_handle());
let data = transmute::<&*c_void, &mut ~WatcherData>(&data);
return &mut **data;
}
}
pub fn drop_watcher_data<H, W: Watcher + NativeHandle<*H>>(watcher: &mut W) {
unsafe {
let data = uvll::get_data_for_uv_handle(watcher.native_handle());
let _data = transmute::<*c_void, ~WatcherData>(data);
uvll::set_data_for_uv_handle(watcher.native_handle(), null::<()>());
}
}
#[test]
fn test_slice_to_uv_buf() {
let slice = [0, .. 20];
let buf = slice_to_uv_buf(slice);
assert!(buf.len == 20);
unsafe {
let base = transmute::<*u8, *mut u8>(buf.base);
(*base) = 1;
(*ptr::mut_offset(base, 1)) = 2;
}
assert!(slice[0] == 1);
assert!(slice[1] == 2);
}
/// The uv buffer type
pub type Buf = uvll::uv_buf_t;
@ -394,6 +341,24 @@ pub fn vec_from_uv_buf(buf: Buf) -> Option<~[u8]> {
}
}
#[test]
fn test_slice_to_uv_buf() {
let slice = [0, .. 20];
let buf = slice_to_uv_buf(slice);
assert!(buf.len == 20);
unsafe {
let base = transmute::<*u8, *mut u8>(buf.base);
(*base) = 1;
(*ptr::mut_offset(base, 1)) = 2;
}
assert!(slice[0] == 1);
assert!(slice[1] == 2);
}
#[test]
fn loop_smoke_test() {
do run_in_bare_thread {
@ -409,7 +374,7 @@ fn idle_new_then_close() {
do run_in_bare_thread {
let mut loop_ = Loop::new();
let idle_watcher = { IdleWatcher::new(&mut loop_) };
idle_watcher.close();
idle_watcher.close(||());
}
}
@ -425,7 +390,7 @@ fn idle_smoke_test() {
assert!(status.is_none());
if unsafe { *count_ptr == 10 } {
idle_watcher.stop();
idle_watcher.close();
idle_watcher.close(||());
} else {
unsafe { *count_ptr = *count_ptr + 1; }
}
@ -449,7 +414,7 @@ fn idle_start_stop_start() {
assert!(status.is_none());
let mut idle_watcher = idle_watcher;
idle_watcher.stop();
idle_watcher.close();
idle_watcher.close(||());
}
}
loop_.run();

View File

@ -10,21 +10,15 @@
use prelude::*;
use libc::{size_t, ssize_t, c_int, c_void};
use cast::transmute_mut_region;
use super::super::uvll;
use super::super::uvll::*;
use super::{Loop, Watcher, Request, UvError, Buf, Callback, NativeHandle, NullCallback,
loop_from_watcher, status_to_maybe_uv_error,
install_watcher_data, get_watcher_data, drop_watcher_data,
vec_to_uv_buf, vec_from_uv_buf};
use super::super::io::net::ip::{IpAddr, Ipv4, Ipv6};
use rt::uv::uvll;
use rt::uv::uvll::*;
use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback};
use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback,
status_to_maybe_uv_error};
use rt::io::net::ip::{IpAddr, Ipv4, Ipv6};
use rt::uv::last_uv_error;
#[cfg(test)] use cell::Cell;
#[cfg(test)] use unstable::run_in_bare_thread;
#[cfg(test)] use super::super::thread::Thread;
#[cfg(test)] use super::super::test::*;
fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) {
fn ip4_as_uv_ip4<T>(addr: IpAddr, f: &fn(*sockaddr_in) -> T) -> T {
match addr {
Ipv4(a, b, c, d, p) => {
unsafe {
@ -34,7 +28,7 @@ fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) {
c as uint,
d as uint), p as int);
do (|| {
f(addr);
f(addr)
}).finally {
free_ip4_addr(addr);
}
@ -47,34 +41,23 @@ fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) {
// uv_stream t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t
// and uv_file_t
pub struct StreamWatcher(*uvll::uv_stream_t);
impl Watcher for StreamWatcher {
fn event_loop(&self) -> Loop {
loop_from_watcher(self)
}
}
pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option<UvError>);
impl Callback for ReadCallback { }
// XXX: The uv alloc callback also has a *uv_handle_t arg
pub type AllocCallback = ~fn(uint) -> Buf;
impl Callback for AllocCallback { }
impl Watcher for StreamWatcher { }
pub impl StreamWatcher {
fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) {
// XXX: Borrowchk problems
let data = get_watcher_data(unsafe { transmute_mut_region(self) });
data.alloc_cb = Some(alloc);
data.read_cb = Some(cb);
{
let data = self.get_watcher_data();
data.alloc_cb = Some(alloc);
data.read_cb = Some(cb);
}
let handle = self.native_handle();
unsafe { uvll::read_start(handle, alloc_cb, read_cb); }
extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf {
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
let data = get_watcher_data(&mut stream_watcher);
let data = stream_watcher.get_watcher_data();
let alloc_cb = data.alloc_cb.get_ref();
return (*alloc_cb)(suggested_size as uint);
}
@ -83,7 +66,7 @@ pub impl StreamWatcher {
rtdebug!("buf addr: %x", buf.base as uint);
rtdebug!("buf len: %d", buf.len as int);
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
let data = get_watcher_data(&mut stream_watcher);
let data = stream_watcher.get_watcher_data();
let cb = data.read_cb.get_ref();
let status = status_to_maybe_uv_error(stream, nread as c_int);
(*cb)(stream_watcher, nread as int, buf, status);
@ -98,22 +81,19 @@ pub impl StreamWatcher {
unsafe { uvll::read_stop(handle); }
}
// XXX: Needs to take &[u8], not ~[u8]
fn write(&mut self, msg: ~[u8], cb: ConnectionCallback) {
// XXX: Borrowck
let data = get_watcher_data(unsafe { transmute_mut_region(self) });
assert!(data.write_cb.is_none());
data.write_cb = Some(cb);
fn write(&mut self, buf: Buf, cb: ConnectionCallback) {
{
let data = self.get_watcher_data();
assert!(data.write_cb.is_none());
data.write_cb = Some(cb);
}
let req = WriteRequest::new();
let buf = vec_to_uv_buf(msg);
assert!(data.buf.is_none());
data.buf = Some(buf);
let bufs = [buf];
unsafe {
assert!(0 == uvll::write(req.native_handle(),
self.native_handle(),
bufs, write_cb));
self.native_handle(),
bufs, write_cb));
}
extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
@ -121,8 +101,7 @@ pub impl StreamWatcher {
let mut stream_watcher = write_request.stream();
write_request.delete();
let cb = {
let data = get_watcher_data(&mut stream_watcher);
let _vec = vec_from_uv_buf(data.buf.swap_unwrap());
let data = stream_watcher.get_watcher_data();
let cb = data.write_cb.swap_unwrap();
cb
};
@ -142,7 +121,7 @@ pub impl StreamWatcher {
fn close(self, cb: NullCallback) {
{
let mut this = self;
let data = get_watcher_data(&mut this);
let data = this.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
@ -152,9 +131,10 @@ pub impl StreamWatcher {
extern fn close_cb(handle: *uvll::uv_stream_t) {
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
{
get_watcher_data(&mut stream_watcher).close_cb.swap_unwrap()();
let data = stream_watcher.get_watcher_data();
data.close_cb.swap_unwrap()();
}
drop_watcher_data(&mut stream_watcher);
stream_watcher.drop_watcher_data();
unsafe { free_handle(handle as *c_void) }
}
}
@ -171,15 +151,7 @@ impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher {
}
pub struct TcpWatcher(*uvll::uv_tcp_t);
impl Watcher for TcpWatcher {
fn event_loop(&self) -> Loop {
loop_from_watcher(self)
}
}
pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
impl Callback for ConnectionCallback { }
impl Watcher for TcpWatcher { }
pub impl TcpWatcher {
fn new(loop_: &mut Loop) -> TcpWatcher {
@ -187,21 +159,24 @@ pub impl TcpWatcher {
let handle = malloc_handle(UV_TCP);
assert!(handle.is_not_null());
assert!(0 == uvll::tcp_init(loop_.native_handle(), handle));
let mut watcher = NativeHandle::from_native_handle(handle);
install_watcher_data(&mut watcher);
let mut watcher: TcpWatcher = NativeHandle::from_native_handle(handle);
watcher.install_watcher_data();
return watcher;
}
}
fn bind(&mut self, address: IpAddr) {
fn bind(&mut self, address: IpAddr) -> Result<(), UvError> {
match address {
Ipv4(*) => {
do ip4_as_uv_ip4(address) |addr| {
let result = unsafe {
uvll::tcp_bind(self.native_handle(), addr)
};
// XXX: bind is likely to fail. need real error handling
assert!(result == 0);
if result == 0 {
Ok(())
} else {
Err(last_uv_error(self))
}
}
}
_ => fail!()
@ -210,8 +185,8 @@ pub impl TcpWatcher {
fn connect(&mut self, address: IpAddr, cb: ConnectionCallback) {
unsafe {
assert!(get_watcher_data(self).connect_cb.is_none());
get_watcher_data(self).connect_cb = Some(cb);
assert!(self.get_watcher_data().connect_cb.is_none());
self.get_watcher_data().connect_cb = Some(cb);
let connect_handle = ConnectRequest::new().native_handle();
match address {
@ -232,7 +207,7 @@ pub impl TcpWatcher {
let mut stream_watcher = connect_request.stream();
connect_request.delete();
let cb: ConnectionCallback = {
let data = get_watcher_data(&mut stream_watcher);
let data = stream_watcher.get_watcher_data();
data.connect_cb.swap_unwrap()
};
let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
@ -242,10 +217,11 @@ pub impl TcpWatcher {
}
fn listen(&mut self, cb: ConnectionCallback) {
// XXX: Borrowck
let data = get_watcher_data(unsafe { transmute_mut_region(self) });
assert!(data.connect_cb.is_none());
data.connect_cb = Some(cb);
{
let data = self.get_watcher_data();
assert!(data.connect_cb.is_none());
data.connect_cb = Some(cb);
}
unsafe {
static BACKLOG: c_int = 128; // XXX should be configurable
@ -257,9 +233,10 @@ pub impl TcpWatcher {
extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) {
rtdebug!("connection_cb");
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
let cb = get_watcher_data(&mut stream_watcher).connect_cb.swap_unwrap();
let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
cb(stream_watcher, status);
let data = stream_watcher.get_watcher_data();
let cb = data.connect_cb.get_ref();
let status = status_to_maybe_uv_error(handle, status);
(*cb)(stream_watcher, status);
}
}
@ -277,12 +254,8 @@ impl NativeHandle<*uvll::uv_tcp_t> for TcpWatcher {
}
}
pub type ConnectCallback = ~fn(ConnectRequest, Option<UvError>);
impl Callback for ConnectCallback { }
// uv_connect_t is a subclass of uv_req_t
struct ConnectRequest(*uvll::uv_connect_t);
impl Request for ConnectRequest { }
impl ConnectRequest {
@ -355,93 +328,109 @@ impl NativeHandle<*uvll::uv_write_t> for WriteRequest {
}
#[test]
fn connect_close() {
do run_in_bare_thread() {
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
// Connect to a port where nobody is listening
let addr = next_test_ip4();
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("tcp_watcher.connect!");
assert!(status.is_some());
assert!(status.get().name() == ~"ECONNREFUSED");
stream_watcher.close(||());
}
loop_.run();
loop_.close();
}
}
#[cfg(test)]
mod test {
use super::*;
use util::ignore;
use cell::Cell;
use vec;
use unstable::run_in_bare_thread;
use rt::thread::Thread;
use rt::test::*;
use rt::uv::{Loop, AllocCallback};
use rt::uv::{vec_from_uv_buf, vec_to_uv_buf, slice_to_uv_buf};
#[test]
fn listen() {
do run_in_bare_thread() {
static MAX: int = 10;
let mut loop_ = Loop::new();
let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
let addr = next_test_ip4();
server_tcp_watcher.bind(addr);
let loop_ = loop_;
rtdebug!("listening");
do server_tcp_watcher.listen |server_stream_watcher, status| {
rtdebug!("listened!");
assert!(status.is_none());
let mut server_stream_watcher = server_stream_watcher;
let mut loop_ = loop_;
let client_tcp_watcher = TcpWatcher::new(&mut loop_);
let mut client_tcp_watcher = client_tcp_watcher.as_stream();
server_stream_watcher.accept(client_tcp_watcher);
let count_cell = Cell(0);
let server_stream_watcher = server_stream_watcher;
rtdebug!("starting read");
let alloc: AllocCallback = |size| {
vec_to_uv_buf(vec::from_elem(size, 0))
};
do client_tcp_watcher.read_start(alloc)
|stream_watcher, nread, buf, status| {
rtdebug!("i'm reading!");
let buf = vec_from_uv_buf(buf);
let mut count = count_cell.take();
if status.is_none() {
rtdebug!("got %d bytes", nread);
let buf = buf.unwrap();
for buf.slice(0, nread as uint).each |byte| {
assert!(*byte == count as u8);
rtdebug!("%u", *byte as uint);
count += 1;
}
} else {
assert!(count == MAX);
do stream_watcher.close {
server_stream_watcher.close(||());
}
}
count_cell.put_back(count);
}
}
let _client_thread = do Thread::start {
rtdebug!("starting client thread");
#[test]
fn connect_close() {
do run_in_bare_thread() {
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
// Connect to a port where nobody is listening
let addr = next_test_ip4();
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("connecting");
assert!(status.is_none());
let mut stream_watcher = stream_watcher;
let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
do stream_watcher.write(msg) |stream_watcher, status| {
rtdebug!("writing");
assert!(status.is_none());
stream_watcher.close(||());
}
rtdebug!("tcp_watcher.connect!");
assert!(status.is_some());
assert!(status.get().name() == ~"ECONNREFUSED");
stream_watcher.close(||());
}
loop_.run();
loop_.close();
};
}
}
let mut loop_ = loop_;
loop_.run();
loop_.close();
#[test]
fn listen() {
do run_in_bare_thread() {
static MAX: int = 10;
let mut loop_ = Loop::new();
let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
let addr = next_test_ip4();
server_tcp_watcher.bind(addr);
let loop_ = loop_;
rtdebug!("listening");
do server_tcp_watcher.listen |server_stream_watcher, status| {
rtdebug!("listened!");
assert!(status.is_none());
let mut server_stream_watcher = server_stream_watcher;
let mut loop_ = loop_;
let client_tcp_watcher = TcpWatcher::new(&mut loop_);
let mut client_tcp_watcher = client_tcp_watcher.as_stream();
server_stream_watcher.accept(client_tcp_watcher);
let count_cell = Cell(0);
let server_stream_watcher = server_stream_watcher;
rtdebug!("starting read");
let alloc: AllocCallback = |size| {
vec_to_uv_buf(vec::from_elem(size, 0))
};
do client_tcp_watcher.read_start(alloc)
|stream_watcher, nread, buf, status| {
rtdebug!("i'm reading!");
let buf = vec_from_uv_buf(buf);
let mut count = count_cell.take();
if status.is_none() {
rtdebug!("got %d bytes", nread);
let buf = buf.unwrap();
for buf.slice(0, nread as uint).each |byte| {
assert!(*byte == count as u8);
rtdebug!("%u", *byte as uint);
count += 1;
}
} else {
assert!(count == MAX);
do stream_watcher.close {
server_stream_watcher.close(||());
}
}
count_cell.put_back(count);
}
}
let _client_thread = do Thread::start {
rtdebug!("starting client thread");
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("connecting");
assert!(status.is_none());
let mut stream_watcher = stream_watcher;
let msg = ~[0, 1, 2, 3, 4, 5, 6 ,7 ,8, 9];
let buf = slice_to_uv_buf(msg);
let msg_cell = Cell(msg);
do stream_watcher.write(buf) |stream_watcher, status| {
rtdebug!("writing");
assert!(status.is_none());
let msg_cell = Cell(msg_cell.take());
stream_watcher.close(||ignore(msg_cell.take()));
}
}
loop_.run();
loop_.close();
};
let mut loop_ = loop_;
loop_.run();
loop_.close();
}
}
}

183
src/libcore/rt/uv/timer.rs Normal file
View File

@ -0,0 +1,183 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use libc::{c_void, c_int};
use option::Some;
use rt::uv::uvll;
use rt::uv::{Watcher, Loop, NativeHandle, TimerCallback, NullCallback};
use rt::uv::status_to_maybe_uv_error;
pub struct TimerWatcher(*uvll::uv_timer_t);
impl Watcher for TimerWatcher { }
impl TimerWatcher {
pub fn new(loop_: &mut Loop) -> TimerWatcher {
unsafe {
let handle = uvll::malloc_handle(uvll::UV_TIMER);
assert!(handle.is_not_null());
assert!(0 == uvll::timer_init(loop_.native_handle(), handle));
let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle);
watcher.install_watcher_data();
return watcher;
}
}
pub fn start(&mut self, timeout: u64, repeat: u64, cb: TimerCallback) {
{
let data = self.get_watcher_data();
data.timer_cb = Some(cb);
}
unsafe {
uvll::timer_start(self.native_handle(), timer_cb, timeout, repeat);
}
extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) {
let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle);
let data = watcher.get_watcher_data();
let cb = data.timer_cb.get_ref();
let status = status_to_maybe_uv_error(handle, status);
(*cb)(watcher, status);
}
}
pub fn stop(&mut self) {
unsafe {
uvll::timer_stop(self.native_handle());
}
}
pub fn close(self, cb: NullCallback) {
let mut watcher = self;
{
let data = watcher.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe {
uvll::close(watcher.native_handle(), close_cb);
}
extern fn close_cb(handle: *uvll::uv_timer_t) {
let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle);
{
let data = watcher.get_watcher_data();
data.close_cb.swap_unwrap()();
}
watcher.drop_watcher_data();
unsafe {
uvll::free_handle(handle as *c_void);
}
}
}
}
impl NativeHandle<*uvll::uv_timer_t> for TimerWatcher {
fn from_native_handle(handle: *uvll::uv_timer_t) -> TimerWatcher {
TimerWatcher(handle)
}
fn native_handle(&self) -> *uvll::uv_idle_t {
match self { &TimerWatcher(ptr) => ptr }
}
}
#[cfg(test)]
mod test {
use super::*;
use rt::uv::Loop;
use unstable::run_in_bare_thread;
#[test]
fn smoke_test() {
do run_in_bare_thread {
let mut count = 0;
let count_ptr: *mut int = &mut count;
let mut loop_ = Loop::new();
let mut timer = TimerWatcher::new(&mut loop_);
do timer.start(10, 0) |timer, status| {
assert!(status.is_none());
unsafe { *count_ptr += 1 };
timer.close(||());
}
loop_.run();
loop_.close();
assert!(count == 1);
}
}
#[test]
fn start_twice() {
do run_in_bare_thread {
let mut count = 0;
let count_ptr: *mut int = &mut count;
let mut loop_ = Loop::new();
let mut timer = TimerWatcher::new(&mut loop_);
do timer.start(10, 0) |timer, status| {
let mut timer = timer;
assert!(status.is_none());
unsafe { *count_ptr += 1 };
do timer.start(10, 0) |timer, status| {
assert!(status.is_none());
unsafe { *count_ptr += 1 };
timer.close(||());
}
}
loop_.run();
loop_.close();
assert!(count == 2);
}
}
#[test]
fn repeat_stop() {
do run_in_bare_thread {
let mut count = 0;
let count_ptr: *mut int = &mut count;
let mut loop_ = Loop::new();
let mut timer = TimerWatcher::new(&mut loop_);
do timer.start(10, 20) |timer, status| {
assert!(status.is_none());
unsafe {
*count_ptr += 1;
if *count_ptr == 10 {
// Stop the timer and do something else
let mut timer = timer;
timer.stop();
// Freeze timer so it can be captured
let timer = timer;
let mut loop_ = timer.event_loop();
let mut timer2 = TimerWatcher::new(&mut loop_);
do timer2.start(10, 0) |timer2, _| {
unsafe { *count_ptr += 1; }
timer2.close(||());
// Restart the original timer
let mut timer = timer;
do timer.start(10, 0) |timer, _| {
unsafe { *count_ptr += 1; }
timer.close(||());
}
}
}
};
}
loop_.run();
loop_.close();
assert!(count == 12);
}
}
}

View File

@ -10,20 +10,24 @@
use option::*;
use result::*;
use super::io::net::ip::IpAddr;
use super::uv::*;
use super::rtio::*;
use ops::Drop;
use old_iter::CopyableIter;
use cell::{Cell, empty_cell};
use cast::transmute;
use super::sched::{Scheduler, local_sched};
use clone::Clone;
use rt::io::IoError;
use rt::io::net::ip::IpAddr;
use rt::uv::*;
use rt::uv::idle::IdleWatcher;
use rt::rtio::*;
use rt::sched::{Scheduler, local_sched};
use rt::io::{standard_error, OtherIoError};
use rt::tube::Tube;
#[cfg(test)] use container::Container;
#[cfg(test)] use uint;
#[cfg(test)] use unstable::run_in_bare_thread;
#[cfg(test)] use super::test::*;
#[cfg(test)] use rt::test::*;
pub struct UvEventLoop {
uvio: UvIoFactory
@ -64,7 +68,16 @@ impl EventLoop for UvEventLoop {
assert!(status.is_none());
let mut idle_watcher = idle_watcher;
idle_watcher.stop();
idle_watcher.close();
idle_watcher.close(||());
f();
}
}
fn callback_ms(&mut self, ms: u64, f: ~fn()) {
let mut timer = TimerWatcher::new(self.uvio.uv_loop());
do timer.start(ms, 0) |timer, status| {
assert!(status.is_none());
timer.close(||());
f();
}
}
@ -100,11 +113,11 @@ impl IoFactory for UvIoFactory {
// Connect to an address and return a new stream
// NB: This blocks the task waiting on the connection.
// It would probably be better to return a future
fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject> {
fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError> {
// Create a cell in the task to hold the result. We will fill
// the cell before resuming the task.
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
let scheduler = local_sched::take();
assert!(scheduler.in_task_context());
@ -122,21 +135,26 @@ impl IoFactory for UvIoFactory {
// Wait for a connection
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("connect: in connect callback");
let maybe_stream = if status.is_none() {
if status.is_none() {
rtdebug!("status is none");
Some(~UvStream(stream_watcher))
let res = Ok(~UvTcpStream { watcher: stream_watcher });
// Store the stream in the task's stack
unsafe { (*result_cell_ptr).put_back(res); }
// Context switch
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
} else {
rtdebug!("status is some");
stream_watcher.close(||());
None
let task_cell = Cell(task_cell.take());
do stream_watcher.close {
let res = Err(uv_error_to_io_error(status.get()));
unsafe { (*result_cell_ptr).put_back(res); }
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
}
};
// Store the stream in the task's stack
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
// Context switch
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
}
}
@ -144,103 +162,124 @@ impl IoFactory for UvIoFactory {
return result_cell.take();
}
fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject> {
fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError> {
let mut watcher = TcpWatcher::new(self.uv_loop());
watcher.bind(addr);
return Some(~UvTcpListener(watcher));
match watcher.bind(addr) {
Ok(_) => Ok(~UvTcpListener::new(watcher)),
Err(uverr) => {
let scheduler = local_sched::take();
do scheduler.deschedule_running_task_and_then |task| {
let task_cell = Cell(task);
do watcher.as_stream().close {
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
}
}
Err(uv_error_to_io_error(uverr))
}
}
}
}
pub struct UvTcpListener(TcpWatcher);
// FIXME #6090: Prefer newtype structs but Drop doesn't work
pub struct UvTcpListener {
watcher: TcpWatcher,
listening: bool,
incoming_streams: Tube<Result<~RtioTcpStreamObject, IoError>>
}
impl UvTcpListener {
fn watcher(&self) -> TcpWatcher {
match self { &UvTcpListener(w) => w }
fn new(watcher: TcpWatcher) -> UvTcpListener {
UvTcpListener {
watcher: watcher,
listening: false,
incoming_streams: Tube::new()
}
}
fn close(&self) {
// XXX: Need to wait until close finishes before returning
self.watcher().as_stream().close(||());
}
fn watcher(&self) -> TcpWatcher { self.watcher }
}
impl Drop for UvTcpListener {
fn finalize(&self) {
// XXX: Again, this never gets called. Use .close() instead
//self.watcher().as_stream().close(||());
}
}
impl TcpListener for UvTcpListener {
fn listen(&mut self) -> Option<~StreamObject> {
rtdebug!("entering listen");
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
let server_tcp_watcher = self.watcher();
let watcher = self.watcher();
let scheduler = local_sched::take();
assert!(scheduler.in_task_context());
do scheduler.deschedule_running_task_and_then |task| {
let task_cell = Cell(task);
let mut server_tcp_watcher = server_tcp_watcher;
do server_tcp_watcher.listen |server_stream_watcher, status| {
let maybe_stream = if status.is_none() {
let mut server_stream_watcher = server_stream_watcher;
let mut loop_ = loop_from_watcher(&server_stream_watcher);
let client_tcp_watcher = TcpWatcher::new(&mut loop_).as_stream();
// XXX: Needs to be surfaced in interface
server_stream_watcher.accept(client_tcp_watcher);
Some(~UvStream::new(client_tcp_watcher))
} else {
None
};
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
rtdebug!("resuming task from listen");
// Context switch
do watcher.as_stream().close {
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
}
}
assert!(!result_cell.is_empty());
return result_cell.take();
}
}
pub struct UvStream(StreamWatcher);
impl RtioTcpListener for UvTcpListener {
impl UvStream {
fn new(watcher: StreamWatcher) -> UvStream {
UvStream(watcher)
}
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> {
rtdebug!("entering listen");
fn watcher(&self) -> StreamWatcher {
match self { &UvStream(w) => w }
}
if self.listening {
return self.incoming_streams.recv();
}
// XXX: finalize isn't working for ~UvStream???
fn close(&self) {
// XXX: Need to wait until this finishes before returning
self.watcher().close(||());
self.listening = true;
let server_tcp_watcher = self.watcher();
let incoming_streams_cell = Cell(self.incoming_streams.clone());
let incoming_streams_cell = Cell(incoming_streams_cell.take());
let mut server_tcp_watcher = server_tcp_watcher;
do server_tcp_watcher.listen |server_stream_watcher, status| {
let maybe_stream = if status.is_none() {
let mut server_stream_watcher = server_stream_watcher;
let mut loop_ = server_stream_watcher.event_loop();
let client_tcp_watcher = TcpWatcher::new(&mut loop_);
let client_tcp_watcher = client_tcp_watcher.as_stream();
// XXX: Need's to be surfaced in interface
server_stream_watcher.accept(client_tcp_watcher);
Ok(~UvTcpStream { watcher: client_tcp_watcher })
} else {
Err(standard_error(OtherIoError))
};
let mut incoming_streams = incoming_streams_cell.take();
incoming_streams.send(maybe_stream);
incoming_streams_cell.put_back(incoming_streams);
}
return self.incoming_streams.recv();
}
}
impl Drop for UvStream {
// FIXME #6090: Prefer newtype structs but Drop doesn't work
pub struct UvTcpStream {
watcher: StreamWatcher
}
impl UvTcpStream {
fn watcher(&self) -> StreamWatcher { self.watcher }
}
impl Drop for UvTcpStream {
fn finalize(&self) {
rtdebug!("closing stream");
//self.watcher().close(||());
rtdebug!("closing tcp stream");
let watcher = self.watcher();
let scheduler = local_sched::take();
do scheduler.deschedule_running_task_and_then |task| {
let task_cell = Cell(task);
do watcher.close {
let scheduler = local_sched::take();
scheduler.resume_task_immediately(task_cell.take());
}
}
}
}
impl Stream for UvStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, ()> {
impl RtioTcpStream for UvTcpStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Result<uint, ()>> = &result_cell;
let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
let scheduler = local_sched::take();
assert!(scheduler.in_task_context());
@ -271,7 +310,7 @@ impl Stream for UvStream {
assert!(nread >= 0);
Ok(nread as uint)
} else {
Err(())
Err(uv_error_to_io_error(status.unwrap()))
};
unsafe { (*result_cell_ptr).put_back(result); }
@ -285,9 +324,9 @@ impl Stream for UvStream {
return result_cell.take();
}
fn write(&mut self, buf: &[u8]) -> Result<(), ()> {
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Result<(), ()>> = &result_cell;
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let scheduler = local_sched::take();
assert!(scheduler.in_task_context());
let watcher = self.watcher();
@ -295,14 +334,12 @@ impl Stream for UvStream {
do scheduler.deschedule_running_task_and_then |task| {
let mut watcher = watcher;
let task_cell = Cell(task);
let buf = unsafe { &*buf_ptr };
// XXX: OMGCOPIES
let buf = buf.to_vec();
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
do watcher.write(buf) |_watcher, status| {
let result = if status.is_none() {
Ok(())
} else {
Err(())
Err(uv_error_to_io_error(status.unwrap()))
};
unsafe { (*result_cell_ptr).put_back(result); }
@ -320,10 +357,12 @@ impl Stream for UvStream {
#[test]
fn test_simple_io_no_connect() {
do run_in_newsched_task {
let io = unsafe { local_sched::unsafe_borrow_io() };
let addr = next_test_ip4();
let maybe_chan = io.connect(addr);
assert!(maybe_chan.is_none());
unsafe {
let io = local_sched::unsafe_borrow_io();
let addr = next_test_ip4();
let maybe_chan = (*io).tcp_connect(addr);
assert!(maybe_chan.is_err());
}
}
}
@ -336,8 +375,8 @@ fn test_simple_tcp_server_and_client() {
do spawntask_immediately {
unsafe {
let io = local_sched::unsafe_borrow_io();
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let mut listener = (*io).tcp_bind(addr).unwrap();
let mut stream = listener.accept().unwrap();
let mut buf = [0, .. 2048];
let nread = stream.read(buf).unwrap();
assert!(nread == 8);
@ -345,17 +384,14 @@ fn test_simple_tcp_server_and_client() {
rtdebug!("%u", buf[i] as uint);
assert!(buf[i] == i as u8);
}
stream.close();
listener.close();
}
}
do spawntask_immediately {
unsafe {
let io = local_sched::unsafe_borrow_io();
let mut stream = io.connect(addr).unwrap();
let mut stream = (*io).tcp_connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.close();
}
}
}
@ -368,8 +404,8 @@ fn test_read_and_block() {
do spawntask_immediately {
let io = unsafe { local_sched::unsafe_borrow_io() };
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
let mut stream = listener.accept().unwrap();
let mut buf = [0, .. 2048];
let expected = 32;
@ -392,26 +428,24 @@ fn test_read_and_block() {
do scheduler.deschedule_running_task_and_then |task| {
let task = Cell(task);
do local_sched::borrow |scheduler| {
scheduler.task_queue.push_back(task.take());
scheduler.enqueue_task(task.take());
}
}
}
// Make sure we had multiple reads
assert!(reads > 1);
stream.close();
listener.close();
}
do spawntask_immediately {
let io = unsafe { local_sched::unsafe_borrow_io() };
let mut stream = io.connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.close();
unsafe {
let io = local_sched::unsafe_borrow_io();
let mut stream = (*io).tcp_connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
}
}
}
@ -426,34 +460,33 @@ fn test_read_read_read() {
do spawntask_immediately {
unsafe {
let io = local_sched::unsafe_borrow_io();
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let mut listener = (*io).tcp_bind(addr).unwrap();
let mut stream = listener.accept().unwrap();
let buf = [1, .. 2048];
let mut total_bytes_written = 0;
while total_bytes_written < MAX {
stream.write(buf);
total_bytes_written += buf.len();
}
stream.close();
listener.close();
}
}
do spawntask_immediately {
let io = unsafe { local_sched::unsafe_borrow_io() };
let mut stream = io.connect(addr).unwrap();
let mut buf = [0, .. 2048];
let mut total_bytes_read = 0;
while total_bytes_read < MAX {
let nread = stream.read(buf).unwrap();
rtdebug!("read %u bytes", nread as uint);
total_bytes_read += nread;
for uint::range(0, nread) |i| {
assert!(buf[i] == 1);
unsafe {
let io = local_sched::unsafe_borrow_io();
let mut stream = (*io).tcp_connect(addr).unwrap();
let mut buf = [0, .. 2048];
let mut total_bytes_read = 0;
while total_bytes_read < MAX {
let nread = stream.read(buf).unwrap();
rtdebug!("read %u bytes", nread as uint);
total_bytes_read += nread;
for uint::range(0, nread) |i| {
assert!(buf[i] == 1);
}
}
rtdebug!("read %u bytes total", total_bytes_read as uint);
}
rtdebug!("read %u bytes total", total_bytes_read as uint);
stream.close();
}
}
}

View File

@ -33,6 +33,15 @@ use libc::{size_t, c_int, c_uint, c_void, c_char, uintptr_t};
use libc::{malloc, free};
use prelude::*;
pub static UNKNOWN: c_int = -1;
pub static OK: c_int = 0;
pub static EOF: c_int = 1;
pub static EADDRINFO: c_int = 2;
pub static EACCES: c_int = 3;
pub static ECONNREFUSED: c_int = 12;
pub static ECONNRESET: c_int = 13;
pub static EPIPE: c_int = 36;
pub struct uv_err_t {
code: c_int,
sys_errno_: c_int
@ -260,9 +269,9 @@ pub unsafe fn buf_init(input: *u8, len: uint) -> uv_buf_t {
pub unsafe fn timer_init(loop_ptr: *c_void, timer_ptr: *uv_timer_t) -> c_int {
return rust_uv_timer_init(loop_ptr, timer_ptr);
}
pub unsafe fn timer_start(timer_ptr: *uv_timer_t, cb: *u8, timeout: uint,
repeat: uint) -> c_int {
return rust_uv_timer_start(timer_ptr, cb, timeout as c_uint, repeat as c_uint);
pub unsafe fn timer_start(timer_ptr: *uv_timer_t, cb: *u8, timeout: u64,
repeat: u64) -> c_int {
return rust_uv_timer_start(timer_ptr, cb, timeout, repeat);
}
pub unsafe fn timer_stop(timer_ptr: *uv_timer_t) -> c_int {
return rust_uv_timer_stop(timer_ptr);
@ -423,8 +432,8 @@ extern {
timer_handle: *uv_timer_t) -> c_int;
fn rust_uv_timer_start(timer_handle: *uv_timer_t,
cb: *u8,
timeout: c_uint,
repeat: c_uint) -> c_int;
timeout: libc::uint64_t,
repeat: libc::uint64_t) -> c_int;
fn rust_uv_timer_stop(handle: *uv_timer_t) -> c_int;
fn rust_uv_malloc_buf_base_of(sug_size: size_t) -> *u8;

View File

@ -46,4 +46,8 @@ pub impl<T> WorkQueue<T> {
None
}
}
fn is_empty(&self) -> bool {
return self.queue.is_empty();
}
}

View File

@ -202,10 +202,12 @@ impl FailWithCause for &'static str {
// FIXME #4427: Temporary until rt::rt_fail_ goes away
pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! {
use rt::{context, OldTaskContext};
use rt::local_services::unsafe_borrow_local_services;
use option::Option;
use rt::{context, OldTaskContext, TaskContext};
use rt::local_services::{unsafe_borrow_local_services, Unwinder};
match context() {
let context = context();
match context {
OldTaskContext => {
unsafe {
gc::cleanup_stack_for_failure();
@ -214,11 +216,26 @@ pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! {
}
}
_ => {
// XXX: Need to print the failure message
gc::cleanup_stack_for_failure();
unsafe {
// XXX: Bad re-allocations. fail! needs some refactoring
let msg = str::raw::from_c_str(msg);
let file = str::raw::from_c_str(file);
let outmsg = fmt!("%s at line %i of file %s", msg, line as int, file);
// XXX: Logging doesn't work correctly in non-task context because it
// invokes the local heap
if context == TaskContext {
error!(outmsg);
} else {
rtdebug!("%s", outmsg);
}
gc::cleanup_stack_for_failure();
let local_services = unsafe_borrow_local_services();
match local_services.unwinder {
let unwinder: &mut Option<Unwinder> = &mut (*local_services).unwinder;
match *unwinder {
Some(ref mut unwinder) => unwinder.begin_unwind(),
None => abort!("failure without unwinder. aborting process")
}

View File

@ -36,7 +36,7 @@ impl Handle {
}
_ => {
let local_services = unsafe_borrow_local_services();
NewHandle(&mut local_services.storage)
NewHandle(&mut (*local_services).storage)
}
}
}

View File

@ -43,6 +43,7 @@ use task::rt::{task_id, sched_id};
use util;
use util::replace;
use unstable::finally::Finally;
use rt::{context, OldTaskContext};
#[cfg(test)] use comm::SharedChan;
@ -558,23 +559,33 @@ pub fn get_scheduler() -> Scheduler {
* ~~~
*/
pub unsafe fn unkillable<U>(f: &fn() -> U) -> U {
let t = rt::rust_get_task();
do (|| {
rt::rust_task_inhibit_kill(t);
if context() == OldTaskContext {
let t = rt::rust_get_task();
do (|| {
rt::rust_task_inhibit_kill(t);
f()
}).finally {
rt::rust_task_allow_kill(t);
}
} else {
// FIXME #6377
f()
}).finally {
rt::rust_task_allow_kill(t);
}
}
/// The inverse of unkillable. Only ever to be used nested in unkillable().
pub unsafe fn rekillable<U>(f: &fn() -> U) -> U {
let t = rt::rust_get_task();
do (|| {
rt::rust_task_allow_kill(t);
if context() == OldTaskContext {
let t = rt::rust_get_task();
do (|| {
rt::rust_task_allow_kill(t);
f()
}).finally {
rt::rust_task_inhibit_kill(t);
}
} else {
// FIXME #6377
f()
}).finally {
rt::rust_task_inhibit_kill(t);
}
}
@ -583,14 +594,19 @@ pub unsafe fn rekillable<U>(f: &fn() -> U) -> U {
* For use with exclusive ARCs, which use pthread mutexes directly.
*/
pub unsafe fn atomically<U>(f: &fn() -> U) -> U {
let t = rt::rust_get_task();
do (|| {
rt::rust_task_inhibit_kill(t);
rt::rust_task_inhibit_yield(t);
if context() == OldTaskContext {
let t = rt::rust_get_task();
do (|| {
rt::rust_task_inhibit_kill(t);
rt::rust_task_inhibit_yield(t);
f()
}).finally {
rt::rust_task_allow_yield(t);
rt::rust_task_allow_kill(t);
}
} else {
// FIXME #6377
f()
}).finally {
rt::rust_task_allow_yield(t);
rt::rust_task_allow_kill(t);
}
}

View File

@ -581,7 +581,7 @@ fn spawn_raw_newsched(_opts: TaskOpts, f: ~fn()) {
use rt::sched::*;
let mut sched = local_sched::take();
let task = ~Task::new(&mut sched.stack_pool, f);
let task = ~Coroutine::new(&mut sched.stack_pool, f);
sched.schedule_new_task(task);
}

View File

@ -16,12 +16,12 @@ use libc::{c_char, c_uchar, c_void, size_t, uintptr_t, c_int, STDERR_FILENO};
use managed::raw::BoxRepr;
use str;
use sys;
use unstable::exchange_alloc;
use cast::transmute;
use rt::{context, OldTaskContext};
use rt::local_services::borrow_local_services;
use option::{Option, Some, None};
use io;
use rt::global_heap;
#[allow(non_camel_case_types)]
pub type rust_task = c_void;
@ -153,7 +153,7 @@ unsafe fn fail_borrowed(box: *mut BoxRepr, file: *c_char, line: size_t) {
#[lang="exchange_malloc"]
#[inline(always)]
pub unsafe fn exchange_malloc(td: *c_char, size: uintptr_t) -> *c_char {
transmute(exchange_alloc::malloc(transmute(td), transmute(size)))
transmute(global_heap::malloc(transmute(td), transmute(size)))
}
/// Because this code is so perf. sensitive, use a static constant so that
@ -233,7 +233,7 @@ impl DebugPrints for io::fd_t {
#[lang="exchange_free"]
#[inline(always)]
pub unsafe fn exchange_free(ptr: *c_char) {
exchange_alloc::free(transmute(ptr))
global_heap::free(transmute(ptr))
}
#[lang="malloc"]
@ -423,18 +423,31 @@ pub unsafe fn strdup_uniq(ptr: *c_uchar, len: uint) -> ~str {
#[lang="start"]
pub fn start(main: *u8, argc: int, argv: **c_char,
crate_map: *u8) -> int {
use libc::getenv;
use rt::start;
use rt;
use sys::Closure;
use ptr;
use cast;
use os;
unsafe {
let use_old_rt = do str::as_c_str("RUST_NEWRT") |s| {
getenv(s).is_null()
};
let use_old_rt = os::getenv("RUST_NEWRT").is_none();
if use_old_rt {
return rust_start(main as *c_void, argc as c_int, argv,
crate_map as *c_void) as int;
} else {
return start(main, argc, argv, crate_map);
return do rt::start(argc, argv as **u8, crate_map) {
unsafe {
// `main` is an `fn() -> ()` that doesn't take an environment
// XXX: Could also call this as an `extern "Rust" fn` once they work
let main = Closure {
code: main as *(),
env: ptr::null(),
};
let mainfn: &fn() = cast::transmute(main);
mainfn();
}
};
}
}

View File

@ -19,7 +19,6 @@ pub mod at_exit;
pub mod global;
pub mod finally;
pub mod weak_task;
pub mod exchange_alloc;
pub mod intrinsics;
pub mod simd;
pub mod extfmt;

View File

@ -819,8 +819,8 @@ extern {
unsafe fn rust_uv_timer_start(
timer_handle: *uv_timer_t,
cb: *u8,
timeout: libc::c_uint,
repeat: libc::c_uint) -> libc::c_int;
timeout: libc::uint64_t,
repeat: libc::uint64_t) -> libc::c_int;
unsafe fn rust_uv_timer_stop(handle: *uv_timer_t) -> libc::c_int;
unsafe fn rust_uv_getaddrinfo(loop_ptr: *libc::c_void,
@ -1084,8 +1084,8 @@ pub unsafe fn timer_init(loop_ptr: *libc::c_void,
}
pub unsafe fn timer_start(timer_ptr: *uv_timer_t, cb: *u8, timeout: uint,
repeat: uint) -> libc::c_int {
return rust_uv_timer_start(timer_ptr, cb, timeout as libc::c_uint,
repeat as libc::c_uint);
return rust_uv_timer_start(timer_ptr, cb, timeout as libc::uint64_t,
repeat as libc::uint64_t);
}
pub unsafe fn timer_stop(timer_ptr: *uv_timer_t) -> libc::c_int {
return rust_uv_timer_stop(timer_ptr);

View File

@ -830,14 +830,14 @@ rust_get_rt_env() {
}
#ifndef _WIN32
pthread_key_t sched_key;
pthread_key_t rt_key = -1;
#else
DWORD sched_key;
DWORD rt_key = -1;
#endif
extern "C" void*
rust_get_sched_tls_key() {
return &sched_key;
rust_get_rt_tls_key() {
return &rt_key;
}
// Initialize the global state required by the new scheduler
@ -852,10 +852,10 @@ rust_initialize_global_state() {
if (!initialized) {
#ifndef _WIN32
assert(!pthread_key_create(&sched_key, NULL));
assert(!pthread_key_create(&rt_key, NULL));
#else
sched_key = TlsAlloc();
assert(sched_key != TLS_OUT_OF_INDEXES);
rt_key = TlsAlloc();
assert(rt_key != TLS_OUT_OF_INDEXES);
#endif
initialized = true;

View File

@ -13,6 +13,7 @@
// that might come from the environment is loaded here, once, during
// init.
#include "sync/lock_and_signal.h"
#include "rust_env.h"
// The environment variables that the runtime knows about
@ -26,6 +27,18 @@
#define RUST_DEBUG_MEM "RUST_DEBUG_MEM"
#define RUST_DEBUG_BORROW "RUST_DEBUG_BORROW"
static lock_and_signal env_lock;
extern "C" CDECL void
rust_take_env_lock() {
env_lock.lock();
}
extern "C" CDECL void
rust_drop_env_lock() {
env_lock.unlock();
}
#if defined(__WIN32__)
static int
get_num_cpus() {
@ -119,6 +132,8 @@ copyenv(const char* name) {
rust_env*
load_env(int argc, char **argv) {
scoped_lock with(env_lock);
rust_env *env = (rust_env*)malloc(sizeof(rust_env));
env->num_sched_threads = (size_t)get_num_threads();
@ -141,3 +156,4 @@ free_env(rust_env *env) {
free(env->rust_seed);
free(env);
}

View File

@ -15,14 +15,15 @@
#include <string.h>
#include <stdio.h>
uintptr_t exchange_count = 0;
extern uintptr_t rust_exchange_count;
uintptr_t rust_exchange_count = 0;
void *
rust_exchange_alloc::malloc(size_t size) {
void *value = ::malloc(size);
assert(value);
sync::increment(exchange_count);
sync::increment(rust_exchange_count);
return value;
}
@ -36,20 +37,15 @@ rust_exchange_alloc::realloc(void *ptr, size_t size) {
void
rust_exchange_alloc::free(void *ptr) {
sync::decrement(exchange_count);
sync::decrement(rust_exchange_count);
::free(ptr);
}
extern "C" uintptr_t *
rust_get_exchange_count_ptr() {
return &exchange_count;
}
void
rust_check_exchange_count_on_exit() {
if (exchange_count != 0) {
if (rust_exchange_count != 0) {
printf("exchange heap not empty on exit\n");
printf("%d dangling allocations\n", (int)exchange_count);
printf("%d dangling allocations\n", (int)rust_exchange_count);
abort();
}
}

View File

@ -324,6 +324,10 @@ void update_log_settings(void* crate_map, char* settings) {
free(buffer);
}
extern "C" CDECL void
rust_update_log_settings(void* crate_map, char* settings) {
update_log_settings(crate_map, settings);
}
//
// Local Variables:

View File

@ -92,3 +92,14 @@ destroy_exchange_stack(rust_exchange_alloc *exchange, stk_seg *stk) {
deregister_valgrind_stack(stk);
exchange->free(stk);
}
extern "C" CDECL unsigned int
rust_valgrind_stack_register(void *start, void *end) {
return VALGRIND_STACK_REGISTER(start, end);
}
extern "C" CDECL void
rust_valgrind_stack_deregister(unsigned int id) {
VALGRIND_STACK_DEREGISTER(id);
}

View File

@ -229,7 +229,7 @@ rust_uv_timer_init(uv_loop_t* loop, uv_timer_t* timer) {
extern "C" int
rust_uv_timer_start(uv_timer_t* the_timer, uv_timer_cb cb,
uint32_t timeout, uint32_t repeat) {
int64_t timeout, int64_t repeat) {
return uv_timer_start(the_timer, cb, timeout, repeat);
}

View File

@ -195,8 +195,8 @@ rust_register_exit_function
rust_get_global_data_ptr
rust_inc_kernel_live_count
rust_dec_kernel_live_count
rust_get_exchange_count_ptr
rust_get_sched_tls_key
rust_exchange_count
rust_get_rt_tls_key
swap_registers
rust_readdir
rust_opendir
@ -234,3 +234,8 @@ rust_try
rust_begin_unwind
rust_take_task_borrow_list
rust_set_task_borrow_list
rust_valgrind_stack_register
rust_valgrind_stack_deregister
rust_take_env_lock
rust_drop_env_lock
rust_update_log_settings

View File

@ -0,0 +1,18 @@
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
// A simple test of starting the runtime manually
#[start]
fn start(argc: int, argv: **u8, crate_map: *u8) -> int {
do core::rt::start(argc, argv, crate_map) {
debug!("creating my own runtime is joy");
}
}