Merge remote-tracking branch 'brson/io'

This also reverts some changes to TLS that were leaking memory.

Conflicts:
	src/libcore/rt/uv/net.rs
	src/libcore/task/local_data_priv.rs
	src/libcore/unstable/lang.rs
This commit is contained in:
Brian Anderson 2013-04-23 15:16:04 -07:00
commit e944c7dade
42 changed files with 1206 additions and 292 deletions

View File

@ -238,7 +238,7 @@ $(foreach target,$(CFG_TARGET_TRIPLES),\
CORELIB_CRATE := $(S)src/libcore/core.rc
CORELIB_INPUTS := $(wildcard $(addprefix $(S)src/libcore/, \
core.rc *.rs */*.rs */*/*rs))
core.rc *.rs */*.rs */*/*rs */*/*/*rs))
######################################################################
# Standard library variables

View File

@ -169,7 +169,7 @@ $$(LIBUV_LIB_$(1)): $$(LIBUV_DEPS)
else ifeq ($(OSTYPE_$(1)), linux-androideabi)
$$(LIBUV_LIB_$(1)): $$(LIBUV_DEPS)
$$(Q)$$(MAKE) -C $$(S)src/libuv/ \
CFLAGS="$$(LIBUV_FLAGS_$$(HOST_$(1))) $$(SNAP_DEFINES)" \
CFLAGS="$$(LIBUV_FLAGS_$$(HOST_$(1))) $$(SNAP_DEFINES) $$(CFLAGS)" \
LDFLAGS="$$(LIBUV_FLAGS_$$(HOST_$(1)))" \
CC="$$(CC_$(1))" \
CXX="$$(CXX_$(1))" \
@ -181,7 +181,7 @@ $$(LIBUV_LIB_$(1)): $$(LIBUV_DEPS)
else
$$(LIBUV_LIB_$(1)): $$(LIBUV_DEPS)
$$(Q)$$(MAKE) -C $$(S)src/libuv/ \
CFLAGS="$$(LIBUV_FLAGS_$$(HOST_$(1))) $$(SNAP_DEFINES)" \
CFLAGS="$$(LIBUV_FLAGS_$$(HOST_$(1))) $$(SNAP_DEFINES) $$(CFLAGS)" \
LDFLAGS="$$(LIBUV_FLAGS_$$(HOST_$(1)))" \
CC="$$(CC_$(1))" \
CXX="$$(CXX_$(1))" \

View File

@ -192,4 +192,25 @@ mod test {
assert!(trapped);
}
// Issue #6009
mod m {
condition! {
sadness: int -> int;
}
mod n {
use super::sadness;
#[test]
fn test_conditions_are_public() {
let mut trapped = false;
do sadness::cond.trap(|_| {
0
}).in {
sadness::cond.raise(0);
}
}
}
}
}

View File

@ -120,6 +120,9 @@ pub mod linkhack {
}
}
// Internal macros
mod macros;
/* The Prelude. */
pub mod prelude;

39
src/libcore/macros.rs Normal file
View File

@ -0,0 +1,39 @@
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
#[macro_escape];
// Some basic logging
macro_rules! rtdebug (
($( $arg:expr),+) => ( {
dumb_println(fmt!( $($arg),+ ));
fn dumb_println(s: &str) {
use io::WriterUtil;
let dbg = ::libc::STDERR_FILENO as ::io::fd_t;
dbg.write_str(s);
dbg.write_str("\n");
}
} )
)
// An alternate version with no output, for turning off logging
macro_rules! rtdebug_ (
($( $arg:expr),+) => ( $(let _ = $arg)*; )
)
macro_rules! abort(
($( $msg:expr),+) => ( {
rtdebug!($($msg),+);
unsafe { ::libc::abort(); }
} )
)

View File

@ -9,14 +9,10 @@
// except according to those terms.
use prelude::*;
use super::misc::PathLike;
use super::support::PathLike;
use super::{Reader, Writer, Seek, Close};
use super::{IoError, SeekStyle};
/// Open a file with the default FileMode and FileAccess
/// # XXX are there sane defaults here?
pub fn open_file<P: PathLike>(_path: &P) -> FileStream { fail!() }
/// # XXX
/// * Ugh, this is ridiculous. What is the best way to represent these options?
enum FileMode {
@ -46,7 +42,7 @@ impl FileStream {
pub fn open<P: PathLike>(_path: &P,
_mode: FileMode,
_access: FileAccess
) -> Result<FileStream, IoError> {
) -> Option<FileStream> {
fail!()
}
}

View File

@ -17,7 +17,7 @@
use prelude::*;
use super::*;
use cmp::min;
/// Writes to an owned, growable byte vector
pub struct MemWriter {
@ -29,13 +29,15 @@ impl MemWriter {
}
impl Writer for MemWriter {
fn write(&mut self, _buf: &[u8]) { fail!() }
fn write(&mut self, buf: &[u8]) {
self.buf.push_all(buf)
}
fn flush(&mut self) { /* no-op */ }
}
impl Seek for MemWriter {
fn tell(&self) -> u64 { fail!() }
fn tell(&self) -> u64 { self.buf.len() as u64 }
fn seek(&mut self, _pos: i64, _style: SeekStyle) { fail!() }
}
@ -77,13 +79,27 @@ impl MemReader {
}
impl Reader for MemReader {
fn read(&mut self, _buf: &mut [u8]) -> Option<uint> { fail!() }
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
{ if self.eof() { return None; } }
fn eof(&mut self) -> bool { fail!() }
let write_len = min(buf.len(), self.buf.len() - self.pos);
{
let input = self.buf.slice(self.pos, self.pos + write_len);
let output = vec::mut_slice(buf, 0, write_len);
assert!(input.len() == output.len());
vec::bytes::copy_memory(output, input, write_len);
}
self.pos += write_len;
assert!(self.pos <= self.buf.len());
return Some(write_len);
}
fn eof(&mut self) -> bool { self.pos == self.buf.len() }
}
impl Seek for MemReader {
fn tell(&self) -> u64 { fail!() }
fn tell(&self) -> u64 { self.pos as u64 }
fn seek(&mut self, _pos: i64, _style: SeekStyle) { fail!() }
}
@ -163,4 +179,43 @@ impl<'self> Seek for BufReader<'self> {
fn tell(&self) -> u64 { fail!() }
fn seek(&mut self, _pos: i64, _style: SeekStyle) { fail!() }
}
}
#[cfg(test)]
mod test {
use prelude::*;
use super::*;
#[test]
fn test_mem_writer() {
let mut writer = MemWriter::new();
assert!(writer.tell() == 0);
writer.write([0]);
assert!(writer.tell() == 1);
writer.write([1, 2, 3]);
writer.write([4, 5, 6, 7]);
assert!(writer.tell() == 8);
assert!(writer.inner() == ~[0, 1, 2, 3, 4, 5 , 6, 7]);
}
#[test]
fn test_mem_reader() {
let mut reader = MemReader::new(~[0, 1, 2, 3, 4, 5, 6, 7]);
let mut buf = [];
assert!(reader.read(buf) == Some(0));
assert!(reader.tell() == 0);
let mut buf = [0];
assert!(reader.read(buf) == Some(1));
assert!(reader.tell() == 1);
assert!(buf == [0]);
let mut buf = [0, ..4];
assert!(reader.read(buf) == Some(4));
assert!(reader.tell() == 5);
assert!(buf == [1, 2, 3, 4]);
assert!(reader.read(buf) == Some(3));
assert!(buf.slice(0, 3) == [5, 6, 7]);
assert!(reader.eof());
assert!(reader.read(buf) == None);
assert!(reader.eof());
}
}

View File

@ -10,8 +10,12 @@
/*! Synchronous I/O
This module defines the Rust interface for synchronous I/O.
It supports file access,
This module defines the Rust interface for synchronous I/O. It is
build around Reader and Writer traits that define byte stream sources
and sinks. Implementations are provided for common I/O streams like
file, TCP, UDP, Unix domain sockets, multiple types of memory bufers.
Readers and Writers may be composed to add things like string parsing,
and compression.
This will likely live in core::io, not core::rt::io.
@ -27,7 +31,7 @@ Some examples of obvious things you might want to do
* Read a complete file to a string, (converting newlines?)
let contents = open("message.txt").read_to_str(); // read_to_str??
let contents = FileStream::open("message.txt").read_to_str(); // read_to_str??
* Write a line to a file
@ -36,13 +40,26 @@ Some examples of obvious things you might want to do
* Iterate over the lines of a file
do FileStream::open("message.txt").each_line |line| {
println(line)
}
* Pull the lines of a file into a vector of strings
let lines = FileStream::open("message.txt").line_iter().to_vec();
* Make an simple HTTP request
let socket = TcpStream::open("localhost:8080");
socket.write_line("GET / HTTP/1.0");
socket.write_line("");
let response = socket.read_to_end();
* Connect based on URL? Requires thinking about where the URL type lives
and how to make protocol handlers extensible, e.g. the "tcp" protocol
yields a `TcpStream`.
connect("tcp://localhost:8080").write_line("HTTP 1.0 GET /");
connect("tcp://localhost:8080");
# Terms
@ -104,25 +121,29 @@ pub use self::stdio::stderr;
pub use self::stdio::print;
pub use self::stdio::println;
pub use self::file::open_file;
pub use self::file::FileStream;
pub use self::net::Listener;
pub use self::net::ip::IpAddr;
pub use self::net::tcp::TcpListener;
pub use self::net::tcp::TcpStream;
pub use self::net::udp::UdpStream;
// Some extension traits that all Readers and Writers get.
pub use self::util::ReaderUtil;
pub use self::util::ReaderByteConversions;
pub use self::util::WriterByteConversions;
pub use self::extensions::ReaderUtil;
pub use self::extensions::ReaderByteConversions;
pub use self::extensions::WriterByteConversions;
/// Synchronous, non-blocking file I/O.
pub mod file;
/// Synchronous, non-blocking network I/O.
#[path = "net/mod.rs"]
pub mod net;
pub mod net {
pub mod tcp;
pub mod udp;
pub mod ip;
#[cfg(unix)]
pub mod unix;
pub mod http;
}
/// Readers and Writers for memory buffers and strings.
#[cfg(not(stage0))] // XXX Using unsnapshotted features
@ -131,6 +152,10 @@ pub mod mem;
/// Non-blocking access to stdin, stdout, stderr
pub mod stdio;
/// Implementations for Option
#[cfg(not(stage0))] // Requires condition! fixes
mod option;
/// Basic stream compression. XXX: Belongs with other flate code
#[cfg(not(stage0))] // XXX Using unsnapshotted features
pub mod flate;
@ -140,10 +165,10 @@ pub mod flate;
pub mod comm_adapters;
/// Extension traits
mod util;
mod extensions;
/// Non-I/O things needed by the I/O module
mod misc;
mod support;
/// Thread-blocking implementations
pub mod native {
@ -173,12 +198,14 @@ pub struct IoError {
detail: Option<~str>
}
#[deriving(Eq)]
pub enum IoErrorKind {
FileNotFound,
FilePermission,
ConnectionFailed,
Closed,
OtherIoError
OtherIoError,
PreviousIoError
}
// XXX: Can't put doc comments on macros
@ -211,9 +238,9 @@ pub trait Reader {
/// println(reader.read_line());
/// }
///
/// # XXX
/// # Failue
///
/// What does this return if the Reader is in an error state?
/// Returns `true` on failure.
fn eof(&mut self) -> bool;
}
@ -253,9 +280,30 @@ pub enum SeekStyle {
/// * Are `u64` and `i64` the right choices?
pub trait Seek {
fn tell(&self) -> u64;
/// Seek to an offset in a stream
///
/// A successful seek clears the EOF indicator.
///
/// # XXX
///
/// * What is the behavior when seeking past the end of a stream?
fn seek(&mut self, pos: i64, style: SeekStyle);
}
/// A listener is a value that listens for connections
pub trait Listener<S> {
/// Wait for and accept an incoming connection
///
/// Returns `None` on timeout.
///
/// # Failure
///
/// Raises `io_error` condition. If the condition is handled,
/// then `accept` returns `None`.
fn accept(&mut self) -> Option<S>;
}
/// Common trait for decorator types.
///
/// Provides accessors to get the inner, 'decorated' values. The I/O library
@ -281,3 +329,16 @@ pub trait Decorator<T> {
/// Take a mutable reference to the decorated value
fn inner_mut_ref<'a>(&'a mut self) -> &'a mut T;
}
pub fn standard_error(kind: IoErrorKind) -> IoError {
match kind {
PreviousIoError => {
IoError {
kind: PreviousIoError,
desc: "Failing due to a previous I/O error",
detail: None
}
}
_ => fail!()
}
}

View File

@ -1,31 +0,0 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::*;
pub mod tcp;
pub mod udp;
pub mod ip;
#[cfg(unix)]
pub mod unix;
pub mod http;
/// A listener is a value that listens for connections
pub trait Listener<S> {
/// Wait for and accept an incoming connection
///
/// Returns `None` on timeout.
///
/// # Failure
///
/// Raises `io_error` condition. If the condition is handled,
/// then `accept` returns `None`.
fn accept(&mut self) -> Option<S>;
}

View File

@ -16,7 +16,7 @@ use super::ip::IpAddr;
pub struct TcpStream;
impl TcpStream {
pub fn connect(_addr: IpAddr) -> Result<TcpStream, IoError> {
pub fn connect(_addr: IpAddr) -> Option<TcpStream> {
fail!()
}
}
@ -40,7 +40,7 @@ impl Close for TcpStream {
pub struct TcpListener;
impl TcpListener {
pub fn new(_addr: IpAddr) -> TcpListener {
pub fn bind(_addr: IpAddr) -> Option<TcpListener> {
fail!()
}
}
@ -48,3 +48,30 @@ impl TcpListener {
impl Listener<TcpStream> for TcpListener {
fn accept(&mut self) -> Option<TcpStream> { fail!() }
}
#[cfg(test)]
mod test {
use super::*;
use rt::test::*;
#[test] #[ignore]
fn smoke_test() {
/*do run_in_newsched_task {
let addr = next_test_ip4();
do spawn_immediately {
let listener = TcpListener::bind(addr);
do listener.accept() {
let mut buf = [0];
listener.read(buf);
assert!(buf[0] == 99);
}
}
do spawn_immediately {
let stream = TcpStream::connect(addr);
stream.write([99]);
}
}*/
}
}

View File

@ -16,7 +16,7 @@ use super::ip::IpAddr;
pub struct UdpStream;
impl UdpStream {
pub fn connect(_addr: IpAddr) -> Result<UdpStream, IoError> {
pub fn connect(_addr: IpAddr) -> Option<UdpStream> {
fail!()
}
}
@ -40,7 +40,7 @@ impl Close for UdpStream {
pub struct UdpListener;
impl UdpListener {
pub fn new(_addr: IpAddr) -> UdpListener {
pub fn bind(_addr: IpAddr) -> Option<UdpListener> {
fail!()
}
}

View File

@ -11,12 +11,12 @@
use prelude::*;
use super::*;
use super::super::*;
use super::super::misc::PathLike;
use super::super::support::PathLike;
pub struct UnixStream;
impl UnixStream {
pub fn connect<P: PathLike>(_path: &P) -> Result<UnixStream, IoError> {
pub fn connect<P: PathLike>(_path: &P) -> Option<UnixStream> {
fail!()
}
}
@ -40,7 +40,7 @@ impl Close for UnixStream {
pub struct UnixListener;
impl UnixListener {
pub fn new<P: PathLike>(_path: &P) -> UnixListener {
pub fn bind<P: PathLike>(_path: &P) -> Option<UnixListener> {
fail!()
}
}

153
src/libcore/rt/io/option.rs Normal file
View File

@ -0,0 +1,153 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Implementations of I/O traits for the Option type
//!
//! I/O constructors return option types to allow errors to be handled.
//! These implementations allow e.g. `Option<FileStream>` to be used
//! as a `Reader` without unwrapping the option first.
//!
//! # XXX Seek and Close
use option::*;
use super::{Reader, Writer, Listener};
use super::{standard_error, PreviousIoError, io_error, IoError};
fn prev_io_error() -> IoError {
standard_error(PreviousIoError)
}
impl<W: Writer> Writer for Option<W> {
fn write(&mut self, buf: &[u8]) {
match *self {
Some(ref mut writer) => writer.write(buf),
None => io_error::cond.raise(prev_io_error())
}
}
fn flush(&mut self) {
match *self {
Some(ref mut writer) => writer.flush(),
None => io_error::cond.raise(prev_io_error())
}
}
}
impl<R: Reader> Reader for Option<R> {
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
match *self {
Some(ref mut reader) => reader.read(buf),
None => {
io_error::cond.raise(prev_io_error());
None
}
}
}
fn eof(&mut self) -> bool {
match *self {
Some(ref mut reader) => reader.eof(),
None => {
io_error::cond.raise(prev_io_error());
true
}
}
}
}
impl<L: Listener<S>, S> Listener<S> for Option<L> {
fn accept(&mut self) -> Option<S> {
match *self {
Some(ref mut listener) => listener.accept(),
None => {
io_error::cond.raise(prev_io_error());
None
}
}
}
}
#[cfg(test)]
mod test {
use option::*;
use super::super::mem::*;
use rt::test::*;
use super::super::{PreviousIoError, io_error};
#[test]
fn test_option_writer() {
do run_in_newsched_task {
let mut writer: Option<MemWriter> = Some(MemWriter::new());
writer.write([0, 1, 2]);
writer.flush();
assert!(writer.unwrap().inner() == ~[0, 1, 2]);
}
}
#[test]
fn test_option_writer_error() {
do run_in_newsched_task {
let mut writer: Option<MemWriter> = None;
let mut called = false;
do io_error::cond.trap(|err| {
assert!(err.kind == PreviousIoError);
called = true;
}).in {
writer.write([0, 0, 0]);
}
assert!(called);
let mut called = false;
do io_error::cond.trap(|err| {
assert!(err.kind == PreviousIoError);
called = true;
}).in {
writer.flush();
}
assert!(called);
}
}
#[test]
fn test_option_reader() {
do run_in_newsched_task {
let mut reader: Option<MemReader> = Some(MemReader::new(~[0, 1, 2, 3]));
let mut buf = [0, 0];
reader.read(buf);
assert!(buf == [0, 1]);
assert!(!reader.eof());
}
}
#[test]
fn test_option_reader_error() {
let mut reader: Option<MemReader> = None;
let mut buf = [];
let mut called = false;
do io_error::cond.trap(|err| {
assert!(err.kind == PreviousIoError);
called = true;
}).in {
reader.read(buf);
}
assert!(called);
let mut called = false;
do io_error::cond.trap(|err| {
assert!(err.kind == PreviousIoError);
called = true;
}).in {
assert!(reader.eof());
}
assert!(called);
}
}

View File

@ -0,0 +1,81 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! The local, garbage collected heap
use libc::{c_void, uintptr_t, size_t};
use ops::Drop;
type MemoryRegion = c_void;
type BoxedRegion = c_void;
pub type OpaqueBox = c_void;
pub type TypeDesc = c_void;
pub struct LocalHeap {
memory_region: *MemoryRegion,
boxed_region: *BoxedRegion
}
impl LocalHeap {
pub fn new() -> LocalHeap {
unsafe {
// Don't need synchronization for the single-threaded local heap
let synchronized = false as uintptr_t;
// XXX: These usually come from the environment
let detailed_leaks = false as uintptr_t;
let poison_on_free = false as uintptr_t;
let region = rust_new_memory_region(synchronized, detailed_leaks, poison_on_free);
assert!(region.is_not_null());
let boxed = rust_new_boxed_region(region, poison_on_free);
assert!(boxed.is_not_null());
LocalHeap {
memory_region: region,
boxed_region: boxed
}
}
}
pub fn alloc(&mut self, td: *TypeDesc, size: uint) -> *OpaqueBox {
unsafe {
return rust_boxed_region_malloc(self.boxed_region, td, size as size_t);
}
}
pub fn free(&mut self, box: *OpaqueBox) {
unsafe {
return rust_boxed_region_free(self.boxed_region, box);
}
}
}
impl Drop for LocalHeap {
fn finalize(&self) {
unsafe {
rust_delete_boxed_region(self.boxed_region);
rust_delete_memory_region(self.memory_region);
}
}
}
extern {
fn rust_new_memory_region(synchronized: uintptr_t,
detailed_leaks: uintptr_t,
poison_on_free: uintptr_t) -> *MemoryRegion;
fn rust_delete_memory_region(region: *MemoryRegion);
fn rust_new_boxed_region(region: *MemoryRegion,
poison_on_free: uintptr_t) -> *BoxedRegion;
fn rust_delete_boxed_region(region: *BoxedRegion);
fn rust_boxed_region_malloc(region: *BoxedRegion,
td: *TypeDesc,
size: size_t) -> *OpaqueBox;
fn rust_boxed_region_free(region: *BoxedRegion, box: *OpaqueBox);
}

View File

@ -0,0 +1,223 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Language-level runtime services that should reasonably expected
//! to be available 'everywhere'. Local heaps, GC, unwinding,
//! local storage, and logging. Even a 'freestanding' Rust would likely want
//! to implement this.
//! Local services may exist in at least three different contexts:
//! when running as a task, when running in the scheduler's context,
//! or when running outside of a scheduler but with local services
//! (freestanding rust with local services?).
use prelude::*;
use libc::{c_void, uintptr_t};
use cast::transmute;
use super::sched::{Task, local_sched};
use super::local_heap::LocalHeap;
pub struct LocalServices {
heap: LocalHeap,
gc: GarbageCollector,
storage: LocalStorage,
logger: Logger,
unwinder: Option<Unwinder>,
destroyed: bool
}
pub struct GarbageCollector;
pub struct LocalStorage(*c_void, Option<~fn(*c_void)>);
pub struct Logger;
pub struct Unwinder {
unwinding: bool,
}
impl LocalServices {
pub fn new() -> LocalServices {
LocalServices {
heap: LocalHeap::new(),
gc: GarbageCollector,
storage: LocalStorage(ptr::null(), None),
logger: Logger,
unwinder: Some(Unwinder { unwinding: false }),
destroyed: false
}
}
pub fn without_unwinding() -> LocalServices {
LocalServices {
heap: LocalHeap::new(),
gc: GarbageCollector,
storage: LocalStorage(ptr::null(), None),
logger: Logger,
unwinder: None,
destroyed: false
}
}
pub fn run(&mut self, f: &fn()) {
// This is just an assertion that `run` was called unsafely
// and this instance of LocalServices is still accessible.
do borrow_local_services |sched| {
assert!(ptr::ref_eq(sched, self));
}
match self.unwinder {
Some(ref mut unwinder) => {
// If there's an unwinder then set up the catch block
unwinder.try(f);
}
None => {
// Otherwise, just run the body
f()
}
}
self.destroy();
}
/// Must be called manually before finalization to clean up
/// thread-local resources. Some of the routines here expect
/// LocalServices to be available recursively so this must be
/// called unsafely, without removing LocalServices from
/// thread-local-storage.
fn destroy(&mut self) {
// This is just an assertion that `destroy` was called unsafely
// and this instance of LocalServices is still accessible.
do borrow_local_services |sched| {
assert!(ptr::ref_eq(sched, self));
}
match self.storage {
LocalStorage(ptr, Some(ref dtor)) => {
(*dtor)(ptr)
}
_ => ()
}
self.destroyed = true;
}
}
impl Drop for LocalServices {
fn finalize(&self) { assert!(self.destroyed) }
}
// Just a sanity check to make sure we are catching a Rust-thrown exception
static UNWIND_TOKEN: uintptr_t = 839147;
impl Unwinder {
pub fn try(&mut self, f: &fn()) {
use sys::Closure;
unsafe {
let closure: Closure = transmute(f);
let code = transmute(closure.code);
let env = transmute(closure.env);
let token = rust_try(try_fn, code, env);
assert!(token == 0 || token == UNWIND_TOKEN);
}
extern fn try_fn(code: *c_void, env: *c_void) {
unsafe {
let closure: Closure = Closure {
code: transmute(code),
env: transmute(env),
};
let closure: &fn() = transmute(closure);
closure();
}
}
extern {
#[rust_stack]
fn rust_try(f: *u8, code: *c_void, data: *c_void) -> uintptr_t;
}
}
pub fn begin_unwind(&mut self) -> ! {
self.unwinding = true;
unsafe {
rust_begin_unwind(UNWIND_TOKEN);
return transmute(());
}
extern {
fn rust_begin_unwind(token: uintptr_t);
}
}
}
/// Borrow a pointer to the installed local services.
/// Fails (likely aborting the process) if local services are not available.
pub fn borrow_local_services(f: &fn(&mut LocalServices)) {
do local_sched::borrow |sched| {
match sched.current_task {
Some(~ref mut task) => {
f(&mut task.local_services)
}
None => {
fail!(~"no local services for schedulers yet")
}
}
}
}
pub unsafe fn unsafe_borrow_local_services() -> &mut LocalServices {
use cast::transmute_mut_region;
match local_sched::unsafe_borrow().current_task {
Some(~ref mut task) => {
transmute_mut_region(&mut task.local_services)
}
None => {
fail!(~"no local services for schedulers yet")
}
}
}
#[cfg(test)]
mod test {
use rt::test::*;
#[test]
fn local_heap() {
do run_in_newsched_task() {
let a = @5;
let b = a;
assert!(*a == 5);
assert!(*b == 5);
}
}
#[test]
fn tls() {
use task::local_data::*;
do run_in_newsched_task() {
unsafe {
fn key(_x: @~str) { }
local_data_set(key, @~"data");
assert!(*local_data_get(key).get() == ~"data");
fn key2(_x: @~str) { }
local_data_set(key2, @~"data");
assert!(*local_data_get(key2).get() == ~"data");
}
}
}
#[test]
fn unwind() {
do run_in_newsched_task() {
let result = spawntask_try(||());
assert!(result.is_ok());
let result = spawntask_try(|| fail!());
assert!(result.is_err());
}
}
}

View File

@ -12,26 +12,6 @@
use libc::c_char;
// Some basic logging
macro_rules! rtdebug_ (
($( $arg:expr),+) => ( {
dumb_println(fmt!( $($arg),+ ));
fn dumb_println(s: &str) {
use io::WriterUtil;
let dbg = ::libc::STDERR_FILENO as ::io::fd_t;
dbg.write_str(s);
dbg.write_str("\n");
}
} )
)
// An alternate version with no output, for turning off logging
macro_rules! rtdebug (
($( $arg:expr),+) => ( $(let _ = $arg)*; )
)
#[path = "sched/mod.rs"]
mod sched;
mod rtio;
@ -48,6 +28,12 @@ mod stack;
mod context;
mod thread;
pub mod env;
pub mod local_services;
mod local_heap;
/// Tools for testing the runtime
#[cfg(test)]
pub mod test;
#[cfg(stage0)]
pub fn start(main: *u8, _argc: int, _argv: *c_char, _crate_map: *u8) -> int {
@ -93,7 +79,7 @@ pub fn start(main: *u8, _argc: int, _argv: **c_char, _crate_map: *u8) -> int {
/// Different runtime services are available depending on context.
#[deriving(Eq)]
pub enum RuntimeContext {
// Only default services, e.g. exchange heap
// Only the exchange heap is available
GlobalContext,
// The scheduler may be accessed
SchedulerContext,
@ -160,24 +146,3 @@ fn test_context() {
sched.run();
}
}
// For setting up tests of the new scheduler
#[cfg(test)]
pub fn run_in_newsched_task(f: ~fn()) {
use cell::Cell;
use unstable::run_in_bare_thread;
use self::sched::Task;
use self::uvio::UvEventLoop;
let f = Cell(Cell(f));
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let f = f.take();
let task = ~do Task::new(&mut sched.stack_pool) {
(f.take())();
};
sched.task_queue.push_back(task);
sched.run();
}
}

View File

@ -16,6 +16,7 @@ use super::work_queue::WorkQueue;
use super::stack::{StackPool, StackSegment};
use super::rtio::{EventLoop, EventLoopObject};
use super::context::Context;
use super::local_services::LocalServices;
use cell::Cell;
#[cfg(test)] use super::uvio::UvEventLoop;
@ -38,7 +39,7 @@ pub struct Scheduler {
/// Always valid when a task is executing, otherwise not
priv saved_context: Context,
/// The currently executing task
priv current_task: Option<~Task>,
current_task: Option<~Task>,
/// An action performed after a context switch on behalf of the
/// code running before the context switch
priv cleanup_job: Option<CleanupJob>
@ -148,7 +149,7 @@ pub impl Scheduler {
}
}
// Control never reaches here
abort!("control reached end of task");
}
fn schedule_new_task(~self, task: ~Task) {
@ -326,10 +327,18 @@ pub struct Task {
/// These are always valid when the task is not running, unless
/// the task is dead
priv saved_context: Context,
/// The heap, GC, unwinding, local storage, logging
local_services: LocalServices
}
pub impl Task {
fn new(stack_pool: &mut StackPool, start: ~fn()) -> Task {
Task::with_local(stack_pool, LocalServices::new(), start)
}
fn with_local(stack_pool: &mut StackPool,
local_services: LocalServices,
start: ~fn()) -> Task {
let start = Task::build_start_wrapper(start);
let mut stack = stack_pool.take_segment(TASK_MIN_STACK_SIZE);
// NB: Context holds a pointer to that ~fn
@ -337,6 +346,7 @@ pub impl Task {
return Task {
current_stack_segment: stack,
saved_context: initial_context,
local_services: local_services
};
}
@ -349,9 +359,11 @@ pub impl Task {
unsafe {
let sched = local_sched::unsafe_borrow();
sched.run_cleanup_job();
}
start();
let sched = local_sched::unsafe_borrow();
let task = sched.current_task.get_mut_ref();
task.local_services.run(start);
}
let sched = local_sched::take();
sched.terminate_current_task();

122
src/libcore/rt/test.rs Normal file
View File

@ -0,0 +1,122 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use cell::Cell;
use result::{Result, Ok, Err};
use super::io::net::ip::{IpAddr, Ipv4};
use rt::local_services::LocalServices;
/// Creates a new scheduler in a new thread and runs a task in it,
/// then waits for the scheduler to exit. Failure of the task
/// will abort the process.
pub fn run_in_newsched_task(f: ~fn()) {
use unstable::run_in_bare_thread;
use super::sched::Task;
use super::uvio::UvEventLoop;
let f = Cell(f);
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~Task::with_local(&mut sched.stack_pool,
LocalServices::without_unwinding(),
f.take());
sched.task_queue.push_back(task);
sched.run();
}
}
/// Test tasks will abort on failure instead of unwinding
pub fn spawntask(f: ~fn()) {
use super::*;
use super::sched::*;
let mut sched = local_sched::take();
let task = ~Task::with_local(&mut sched.stack_pool,
LocalServices::without_unwinding(),
f);
do sched.switch_running_tasks_and_then(task) |task| {
let task = Cell(task);
let sched = local_sched::take();
sched.schedule_new_task(task.take());
}
}
/// Create a new task and run it right now. Aborts on failure
pub fn spawntask_immediately(f: ~fn()) {
use super::*;
use super::sched::*;
let mut sched = local_sched::take();
let task = ~Task::with_local(&mut sched.stack_pool,
LocalServices::without_unwinding(),
f);
do sched.switch_running_tasks_and_then(task) |task| {
let task = Cell(task);
do local_sched::borrow |sched| {
sched.task_queue.push_front(task.take());
}
}
}
/// Spawn a task and wait for it to finish, returning whether it completed successfully or failed
pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
use cell::Cell;
use super::sched::*;
use task;
use unstable::finally::Finally;
// Our status variables will be filled in from the scheduler context
let mut failed = false;
let failed_ptr: *mut bool = &mut failed;
// Switch to the scheduler
let f = Cell(Cell(f));
let mut sched = local_sched::take();
do sched.deschedule_running_task_and_then() |old_task| {
let old_task = Cell(old_task);
let f = f.take();
let mut sched = local_sched::take();
let new_task = ~do Task::new(&mut sched.stack_pool) {
do (|| {
(f.take())()
}).finally {
// Check for failure then resume the parent task
unsafe { *failed_ptr = task::failing(); }
let sched = local_sched::take();
do sched.switch_running_tasks_and_then(old_task.take()) |new_task| {
let new_task = Cell(new_task);
do local_sched::borrow |sched| {
sched.task_queue.push_front(new_task.take());
}
}
}
};
sched.resume_task_immediately(new_task);
}
if !failed { Ok(()) } else { Err(()) }
}
/// Get a port number, starting at 9600, for use in tests
pub fn next_test_port() -> u16 {
unsafe {
return rust_dbg_next_port() as u16;
}
extern {
fn rust_dbg_next_port() -> ::libc::uintptr_t;
}
}
/// Get a unique localhost:port pair starting at 9600
pub fn next_test_ip4() -> IpAddr {
Ipv4(127, 0, 0, 1, next_test_port())
}

View File

@ -301,7 +301,8 @@ struct WatcherData {
write_cb: Option<ConnectionCallback>,
connect_cb: Option<ConnectionCallback>,
close_cb: Option<NullCallback>,
alloc_cb: Option<AllocCallback>
alloc_cb: Option<AllocCallback>,
buf: Option<Buf>
}
pub fn install_watcher_data<H, W: Watcher + NativeHandle<*H>>(watcher: &mut W) {
@ -311,7 +312,8 @@ pub fn install_watcher_data<H, W: Watcher + NativeHandle<*H>>(watcher: &mut W) {
write_cb: None,
connect_cb: None,
close_cb: None,
alloc_cb: None
alloc_cb: None,
buf: None
};
let data = transmute::<~WatcherData, *c_void>(data);
uvll::set_data_for_uv_handle(watcher.native_handle(), data);

View File

@ -19,12 +19,10 @@ use super::{Loop, Watcher, Request, UvError, Buf, Callback, NativeHandle, NullCa
vec_to_uv_buf, vec_from_uv_buf};
use super::super::io::net::ip::{IpAddr, Ipv4, Ipv6};
#[cfg(test)]
use unstable::run_in_bare_thread;
#[cfg(test)]
use super::super::thread::Thread;
#[cfg(test)]
use cell::Cell;
#[cfg(test)] use cell::Cell;
#[cfg(test)] use unstable::run_in_bare_thread;
#[cfg(test)] use super::super::thread::Thread;
#[cfg(test)] use super::super::test::*;
fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) {
match addr {
@ -109,21 +107,25 @@ pub impl StreamWatcher {
let req = WriteRequest::new();
let buf = vec_to_uv_buf(msg);
// XXX: Allocation
let bufs = ~[buf];
assert!(data.buf.is_none());
data.buf = Some(buf);
let bufs = [buf];
unsafe {
assert!(0 == uvll::write(req.native_handle(),
self.native_handle(),
&bufs, write_cb));
bufs, write_cb));
}
// XXX: Freeing immediately after write. Is this ok?
let _v = vec_from_uv_buf(buf);
extern fn write_cb(req: *uvll::uv_write_t, status: c_int) {
let write_request: WriteRequest = NativeHandle::from_native_handle(req);
let mut stream_watcher = write_request.stream();
write_request.delete();
let cb = get_watcher_data(&mut stream_watcher).write_cb.swap_unwrap();
let cb = {
let data = get_watcher_data(&mut stream_watcher);
let _vec = vec_from_uv_buf(data.buf.swap_unwrap());
let cb = data.write_cb.swap_unwrap();
cb
};
let status = status_to_maybe_uv_error(stream_watcher.native_handle(), status);
cb(stream_watcher, status);
}
@ -361,7 +363,7 @@ fn connect_close() {
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
// Connect to a port where nobody is listening
let addr = Ipv4(127, 0, 0, 1, 2923);
let addr = next_test_ip4();
do tcp_watcher.connect(addr) |stream_watcher, status| {
rtdebug!("tcp_watcher.connect!");
assert!(status.is_some());
@ -373,47 +375,13 @@ fn connect_close() {
}
}
#[test]
#[ignore(reason = "need a server to connect to")]
fn connect_read() {
do run_in_bare_thread() {
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
let addr = Ipv4(127, 0, 0, 1, 2924);
do tcp_watcher.connect(addr) |stream_watcher, status| {
let mut stream_watcher = stream_watcher;
rtdebug!("tcp_watcher.connect!");
assert!(status.is_none());
let alloc: AllocCallback = |size| {
vec_to_uv_buf(vec::from_elem(size, 0))
};
do stream_watcher.read_start(alloc)
|stream_watcher, _nread, buf, status| {
let buf = vec_from_uv_buf(buf);
rtdebug!("read cb!");
if status.is_none() {
let _bytes = buf.unwrap();
rtdebug!("%s", bytes.slice(0, nread as uint).to_str());
} else {
rtdebug!("status after read: %s", status.get().to_str());
rtdebug!("closing");
stream_watcher.close(||());
}
}
}
loop_.run();
loop_.close();
}
}
#[test]
fn listen() {
do run_in_bare_thread() {
static MAX: int = 10;
let mut loop_ = Loop::new();
let mut server_tcp_watcher = { TcpWatcher::new(&mut loop_) };
let addr = Ipv4(127, 0, 0, 1, 2925);
let addr = next_test_ip4();
server_tcp_watcher.bind(addr);
let loop_ = loop_;
rtdebug!("listening");

View File

@ -19,9 +19,10 @@ use cell::{Cell, empty_cell};
use cast::transmute;
use super::sched::{Scheduler, local_sched};
#[cfg(test)] use super::sched::Task;
#[cfg(test)] use unstable::run_in_bare_thread;
#[cfg(test)] use uint;
#[cfg(test)] use unstable::run_in_bare_thread;
#[cfg(test)] use super::sched::Task;
#[cfg(test)] use super::test::*;
pub struct UvEventLoop {
uvio: UvIoFactory
@ -334,38 +335,22 @@ impl Stream for UvStream {
}
#[test]
#[ignore(reason = "ffi struct issues")]
fn test_simple_io_no_connect() {
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~do Task::new(&mut sched.stack_pool) {
let io = unsafe { local_sched::unsafe_borrow_io() };
let addr = Ipv4(127, 0, 0, 1, 2926);
let maybe_chan = io.connect(addr);
assert!(maybe_chan.is_none());
};
sched.task_queue.push_back(task);
sched.run();
do run_in_newsched_task {
let io = unsafe { local_sched::unsafe_borrow_io() };
let addr = next_test_ip4();
let maybe_chan = io.connect(addr);
assert!(maybe_chan.is_none());
}
}
#[test]
#[ignore(reason = "ffi struct issues")]
fn test_simple_tcp_server_and_client() {
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let addr = Ipv4(127, 0, 0, 1, 2929);
do run_in_newsched_task {
let addr = next_test_ip4();
let client_task = ~do Task::new(&mut sched.stack_pool) {
unsafe {
let io = local_sched::unsafe_borrow_io();
let mut stream = io.connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.close();
}
};
let server_task = ~do Task::new(&mut sched.stack_pool) {
// Start the server first so it's listening when we connect
do spawntask_immediately {
unsafe {
let io = local_sched::unsafe_borrow_io();
let mut listener = io.bind(addr).unwrap();
@ -380,32 +365,25 @@ fn test_simple_tcp_server_and_client() {
stream.close();
listener.close();
}
};
}
// Start the server first so it listens before the client connects
sched.task_queue.push_back(server_task);
sched.task_queue.push_back(client_task);
sched.run();
do spawntask_immediately {
unsafe {
let io = local_sched::unsafe_borrow_io();
let mut stream = io.connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.close();
}
}
}
}
#[test] #[ignore(reason = "busted")]
fn test_read_and_block() {
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let addr = Ipv4(127, 0, 0, 1, 2930);
do run_in_newsched_task {
let addr = next_test_ip4();
let client_task = ~do Task::new(&mut sched.stack_pool) {
let io = unsafe { local_sched::unsafe_borrow_io() };
let mut stream = io.connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.close();
};
let server_task = ~do Task::new(&mut sched.stack_pool) {
do spawntask_immediately {
let io = unsafe { local_sched::unsafe_borrow_io() };
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
@ -441,36 +419,58 @@ fn test_read_and_block() {
stream.close();
listener.close();
};
}
do spawntask_immediately {
let io = unsafe { local_sched::unsafe_borrow_io() };
let mut stream = io.connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.close();
}
// Start the server first so it listens before the client connects
sched.task_queue.push_back(server_task);
sched.task_queue.push_back(client_task);
sched.run();
}
}
#[test] #[ignore(reason = "needs server")]
#[test]
fn test_read_read_read() {
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let addr = Ipv4(127, 0, 0, 1, 2931);
do run_in_newsched_task {
let addr = next_test_ip4();
static MAX: uint = 500000;
do spawntask_immediately {
unsafe {
let io = local_sched::unsafe_borrow_io();
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let mut buf = [1, .. 2048];
let mut total_bytes_written = 0;
while total_bytes_written < MAX {
stream.write(buf);
total_bytes_written += buf.len();
}
stream.close();
listener.close();
}
}
let client_task = ~do Task::new(&mut sched.stack_pool) {
do spawntask_immediately {
let io = unsafe { local_sched::unsafe_borrow_io() };
let mut stream = io.connect(addr).unwrap();
let mut buf = [0, .. 2048];
let mut total_bytes_read = 0;
while total_bytes_read < 500000000 {
while total_bytes_read < MAX {
let nread = stream.read(buf).unwrap();
rtdebug!("read %u bytes", nread as uint);
total_bytes_read += nread;
for uint::range(0, nread) |i| {
assert!(buf[i] == 1);
}
}
rtdebug_!("read %u bytes total", total_bytes_read as uint);
rtdebug!("read %u bytes total", total_bytes_read as uint);
stream.close();
};
sched.task_queue.push_back(client_task);
sched.run();
}
}
}

View File

@ -219,9 +219,9 @@ pub unsafe fn accept(server: *c_void, client: *c_void) -> c_int {
return rust_uv_accept(server as *c_void, client as *c_void);
}
pub unsafe fn write<T>(req: *uv_write_t, stream: *T, buf_in: *~[uv_buf_t], cb: *u8) -> c_int {
let buf_ptr = vec::raw::to_ptr(*buf_in);
let buf_cnt = vec::len(*buf_in) as i32;
pub unsafe fn write<T>(req: *uv_write_t, stream: *T, buf_in: &[uv_buf_t], cb: *u8) -> c_int {
let buf_ptr = vec::raw::to_ptr(buf_in);
let buf_cnt = vec::len(buf_in) as i32;
return rust_uv_write(req as *c_void, stream as *c_void, buf_ptr, buf_cnt, cb);
}
pub unsafe fn read_start(stream: *uv_stream_t, on_alloc: *u8, on_read: *u8) -> c_int {

View File

@ -656,12 +656,14 @@ mod tests {
#[test]
#[cfg(unix)]
#[ignore(reason = "long run time")]
fn test_unforced_destroy_actually_kills() {
test_destroy_actually_kills(false);
}
#[test]
#[cfg(unix)]
#[ignore(reason = "long run time")]
fn test_forced_destroy_actually_kills() {
test_destroy_actually_kills(true);
}

View File

@ -10,6 +10,7 @@
//! Misc low level stuff
use option::{Some, None};
use cast;
use cmp::{Eq, Ord};
use gc;
@ -167,12 +168,30 @@ pub fn log_str<T>(t: &T) -> ~str {
/** Initiate task failure */
pub fn begin_unwind(msg: ~str, file: ~str, line: uint) -> ! {
do str::as_buf(msg) |msg_buf, _msg_len| {
do str::as_buf(file) |file_buf, _file_len| {
use rt::{context, OldTaskContext};
use rt::local_services::unsafe_borrow_local_services;
match context() {
OldTaskContext => {
do str::as_buf(msg) |msg_buf, _msg_len| {
do str::as_buf(file) |file_buf, _file_len| {
unsafe {
let msg_buf = cast::transmute(msg_buf);
let file_buf = cast::transmute(file_buf);
begin_unwind_(msg_buf, file_buf, line as libc::size_t)
}
}
}
}
_ => {
gc::cleanup_stack_for_failure();
unsafe {
let msg_buf = cast::transmute(msg_buf);
let file_buf = cast::transmute(file_buf);
begin_unwind_(msg_buf, file_buf, line as libc::size_t)
let local_services = unsafe_borrow_local_services();
match local_services.unwinder {
Some(ref mut unwinder) => unwinder.begin_unwind(),
None => abort!("failure without unwinder. aborting process")
}
}
}
}

View File

@ -27,7 +27,7 @@ magic.
*/
use prelude::*;
use task::local_data_priv::{local_get, local_pop, local_modify, local_set};
use task::local_data_priv::{local_get, local_pop, local_modify, local_set, Handle};
use task::rt;
/**
@ -53,7 +53,7 @@ pub type LocalDataKey<'self,T> = &'self fn(v: @T);
pub unsafe fn local_data_pop<T:Durable>(
key: LocalDataKey<T>) -> Option<@T> {
local_pop(rt::rust_get_task(), key)
local_pop(Handle::new(), key)
}
/**
* Retrieve a task-local data value. It will also be kept alive in the
@ -62,7 +62,7 @@ pub unsafe fn local_data_pop<T:Durable>(
pub unsafe fn local_data_get<T:Durable>(
key: LocalDataKey<T>) -> Option<@T> {
local_get(rt::rust_get_task(), key)
local_get(Handle::new(), key)
}
/**
* Store a value in task-local data. If this key already has a value,
@ -71,7 +71,7 @@ pub unsafe fn local_data_get<T:Durable>(
pub unsafe fn local_data_set<T:Durable>(
key: LocalDataKey<T>, data: @T) {
local_set(rt::rust_get_task(), key, data)
local_set(Handle::new(), key, data)
}
/**
* Modify a task-local data value. If the function returns 'None', the
@ -81,7 +81,7 @@ pub unsafe fn local_data_modify<T:Durable>(
key: LocalDataKey<T>,
modify_fn: &fn(Option<@T>) -> Option<@T>) {
local_modify(rt::rust_get_task(), key, modify_fn)
local_modify(Handle::new(), key, modify_fn)
}
#[test]

View File

@ -18,6 +18,30 @@ use task::rt;
use task::local_data::LocalDataKey;
use super::rt::rust_task;
use rt::local_services::LocalStorage;
pub enum Handle {
OldHandle(*rust_task),
NewHandle(*mut LocalStorage)
}
impl Handle {
pub fn new() -> Handle {
use rt::{context, OldTaskContext};
use rt::local_services::unsafe_borrow_local_services;
unsafe {
match context() {
OldTaskContext => {
OldHandle(rt::rust_get_task())
}
_ => {
let local_services = unsafe_borrow_local_services();
NewHandle(&mut local_services.storage)
}
}
}
}
}
pub trait LocalData { }
impl<T:Durable> LocalData for @T { }
@ -25,8 +49,8 @@ impl<T:Durable> LocalData for @T { }
impl Eq for @LocalData {
fn eq(&self, other: &@LocalData) -> bool {
unsafe {
let ptr_a: (uint, uint) = cast::transmute(*self);
let ptr_b: (uint, uint) = cast::transmute(*other);
let ptr_a: (uint, uint) = cast::reinterpret_cast(&(*self));
let ptr_b: (uint, uint) = cast::reinterpret_cast(other);
return ptr_a == ptr_b;
}
}
@ -39,19 +63,30 @@ type TaskLocalElement = (*libc::c_void, *libc::c_void, @LocalData);
// Has to be a pointer at outermost layer; the foreign call returns void *.
type TaskLocalMap = @mut ~[Option<TaskLocalElement>];
extern fn cleanup_task_local_map(map_ptr: *libc::c_void) {
fn cleanup_task_local_map(map_ptr: *libc::c_void) {
unsafe {
assert!(!map_ptr.is_null());
// Get and keep the single reference that was created at the
// beginning.
let _map: TaskLocalMap = cast::transmute(map_ptr);
let _map: TaskLocalMap = cast::reinterpret_cast(&map_ptr);
// All local_data will be destroyed along with the map.
}
}
// Gets the map from the runtime. Lazily initialises if not done so already.
unsafe fn get_local_map(handle: Handle) -> TaskLocalMap {
match handle {
OldHandle(task) => get_task_local_map(task),
NewHandle(local_storage) => get_newsched_local_map(local_storage)
}
}
unsafe fn get_task_local_map(task: *rust_task) -> TaskLocalMap {
extern fn cleanup_task_local_map_(map_ptr: *libc::c_void) {
cleanup_task_local_map(map_ptr);
}
// Relies on the runtime initialising the pointer to null.
// Note: The map's box lives in TLS invisibly referenced once. Each time
// we retrieve it for get/set, we make another reference, which get/set
@ -62,7 +97,7 @@ unsafe fn get_task_local_map(task: *rust_task) -> TaskLocalMap {
// Use reinterpret_cast -- transmute would take map away from us also.
rt::rust_set_task_local_data(
task, cast::transmute(map));
rt::rust_task_local_data_atexit(task, cleanup_task_local_map);
rt::rust_task_local_data_atexit(task, cleanup_task_local_map_);
// Also need to reference it an extra time to keep it for now.
let nonmut = cast::transmute::<TaskLocalMap,
@~[Option<TaskLocalElement>]>(map);
@ -77,6 +112,32 @@ unsafe fn get_task_local_map(task: *rust_task) -> TaskLocalMap {
}
}
unsafe fn get_newsched_local_map(local: *mut LocalStorage) -> TaskLocalMap {
match &mut *local {
&LocalStorage(map_ptr, Some(_)) => {
assert!(map_ptr.is_not_null());
let map = cast::transmute(map_ptr);
let nonmut = cast::transmute::<TaskLocalMap,
@~[Option<TaskLocalElement>]>(map);
cast::bump_box_refcount(nonmut);
return map;
}
&LocalStorage(ref mut map_ptr, ref mut at_exit) => {
assert!((*map_ptr).is_null());
let map: TaskLocalMap = @mut ~[];
// Use reinterpret_cast -- transmute would take map away from us also.
*map_ptr = cast::reinterpret_cast(&map);
let at_exit_fn: ~fn(*libc::c_void) = |p|cleanup_task_local_map(p);
*at_exit = Some(at_exit_fn);
// Also need to reference it an extra time to keep it for now.
let nonmut = cast::transmute::<TaskLocalMap,
@~[Option<TaskLocalElement>]>(map);
cast::bump_box_refcount(nonmut);
return map;
}
}
}
unsafe fn key_to_key_value<T:Durable>(
key: LocalDataKey<T>) -> *libc::c_void {
@ -106,10 +167,10 @@ unsafe fn local_data_lookup<T:Durable>(
}
unsafe fn local_get_helper<T:Durable>(
task: *rust_task, key: LocalDataKey<T>,
handle: Handle, key: LocalDataKey<T>,
do_pop: bool) -> Option<@T> {
let map = get_task_local_map(task);
let map = get_local_map(handle);
// Interpreturn our findings from the map
do local_data_lookup(map, key).map |result| {
// A reference count magically appears on 'data' out of thin air. It
@ -128,23 +189,23 @@ unsafe fn local_get_helper<T:Durable>(
pub unsafe fn local_pop<T:Durable>(
task: *rust_task,
handle: Handle,
key: LocalDataKey<T>) -> Option<@T> {
local_get_helper(task, key, true)
local_get_helper(handle, key, true)
}
pub unsafe fn local_get<T:Durable>(
task: *rust_task,
handle: Handle,
key: LocalDataKey<T>) -> Option<@T> {
local_get_helper(task, key, false)
local_get_helper(handle, key, false)
}
pub unsafe fn local_set<T:Durable>(
task: *rust_task, key: LocalDataKey<T>, data: @T) {
handle: Handle, key: LocalDataKey<T>, data: @T) {
let map = get_task_local_map(task);
let map = get_local_map(handle);
// Store key+data as *voids. Data is invisibly referenced once; key isn't.
let keyval = key_to_key_value(key);
// We keep the data in two forms: one as an unsafe pointer, so we can get
@ -152,7 +213,7 @@ pub unsafe fn local_set<T:Durable>(
// own on it can be dropped when the box is destroyed. The unsafe pointer
// does not have a reference associated with it, so it may become invalid
// when the box is destroyed.
let data_ptr = cast::transmute(data);
let data_ptr = cast::reinterpret_cast(&data);
let data_box = @data as @LocalData;
// Construct new entry to store in the map.
let new_entry = Some((keyval, data_ptr, data_box));
@ -174,12 +235,12 @@ pub unsafe fn local_set<T:Durable>(
}
pub unsafe fn local_modify<T:Durable>(
task: *rust_task, key: LocalDataKey<T>,
handle: Handle, key: LocalDataKey<T>,
modify_fn: &fn(Option<@T>) -> Option<@T>) {
// Could be more efficient by doing the lookup work, but this is easy.
let newdata = modify_fn(local_pop(task, key));
let newdata = modify_fn(local_pop(handle, key));
if newdata.is_some() {
local_set(task, key, newdata.unwrap());
local_set(handle, key, newdata.unwrap());
}
}

View File

@ -558,8 +558,31 @@ pub fn yield() {
pub fn failing() -> bool {
//! True if the running task has failed
unsafe {
rt::rust_task_is_unwinding(rt::rust_get_task())
use rt::{context, OldTaskContext};
use rt::local_services::borrow_local_services;
match context() {
OldTaskContext => {
unsafe {
rt::rust_task_is_unwinding(rt::rust_get_task())
}
}
_ => {
let mut unwinding = false;
do borrow_local_services |local| {
unwinding = match local.unwinder {
Some(unwinder) => {
unwinder.unwinding
}
None => {
// Because there is no unwinder we can't be unwinding.
// (The process will abort on failure)
false
}
}
}
return unwinding;
}
}
}
@ -1229,7 +1252,7 @@ fn test_spawn_thread_on_demand() {
#[test]
fn test_simple_newsched_spawn() {
use rt::run_in_newsched_task;
use rt::test::run_in_newsched_task;
do run_in_newsched_task {
spawn(||())

View File

@ -80,7 +80,7 @@ use prelude::*;
use unstable;
use ptr;
use hashmap::HashSet;
use task::local_data_priv::{local_get, local_set};
use task::local_data_priv::{local_get, local_set, OldHandle};
use task::rt::rust_task;
use task::rt;
use task::{Failure, ManualThreads, PlatformThread, SchedOpts, SingleThreaded};
@ -451,7 +451,7 @@ fn gen_child_taskgroup(linked: bool, supervised: bool)
/*##################################################################*
* Step 1. Get spawner's taskgroup info.
*##################################################################*/
let spawner_group = match local_get(spawner, taskgroup_key!()) {
let spawner_group = match local_get(OldHandle(spawner), taskgroup_key!()) {
None => {
// Main task, doing first spawn ever. Lazily initialise here.
let mut members = new_taskset();
@ -463,7 +463,7 @@ fn gen_child_taskgroup(linked: bool, supervised: bool)
// Main task/group has no ancestors, no notifier, etc.
let group =
@TCB(spawner, tasks, AncestorList(None), true, None);
local_set(spawner, taskgroup_key!(), group);
local_set(OldHandle(spawner), taskgroup_key!(), group);
group
}
Some(group) => group
@ -627,7 +627,7 @@ fn spawn_raw_oldsched(opts: TaskOpts, f: ~fn()) {
let group = @TCB(child, child_arc, ancestors,
is_main, notifier);
unsafe {
local_set(child, taskgroup_key!(), group);
local_set(OldHandle(child), taskgroup_key!(), group);
}
// Run the child's body.

View File

@ -17,6 +17,8 @@ use str;
use sys;
use unstable::exchange_alloc;
use cast::transmute;
use rt::{context, OldTaskContext};
use rt::local_services::borrow_local_services;
#[allow(non_camel_case_types)]
pub type rust_task = c_void;
@ -89,7 +91,18 @@ pub unsafe fn exchange_free(ptr: *c_char) {
#[lang="malloc"]
#[inline(always)]
pub unsafe fn local_malloc(td: *c_char, size: uintptr_t) -> *c_char {
return rustrt::rust_upcall_malloc_noswitch(td, size);
match context() {
OldTaskContext => {
return rustrt::rust_upcall_malloc_noswitch(td, size);
}
_ => {
let mut alloc = ::ptr::null();
do borrow_local_services |srv| {
alloc = srv.heap.alloc(td as *c_void, size as uint) as *c_char;
}
return alloc;
}
}
}
// NB: Calls to free CANNOT be allowed to fail, as throwing an exception from
@ -98,7 +111,16 @@ pub unsafe fn local_malloc(td: *c_char, size: uintptr_t) -> *c_char {
#[lang="free"]
#[inline(always)]
pub unsafe fn local_free(ptr: *c_char) {
rustrt::rust_upcall_free_noswitch(ptr);
match context() {
OldTaskContext => {
rustrt::rust_upcall_free_noswitch(ptr);
}
_ => {
do borrow_local_services |srv| {
srv.heap.free(ptr as *c_void);
}
}
}
}
#[lang="borrow_as_imm"]

View File

@ -475,7 +475,7 @@ pub fn core_macros() -> ~str {
{ $c:ident: $in:ty -> $out:ty; } => {
mod $c {
pub mod $c {
fn key(_x: @::core::condition::Handler<$in,$out>) { }
pub static cond :

View File

@ -27,11 +27,11 @@ rust_opaque_box *boxed_region::malloc(type_desc *td, size_t body_size) {
if (live_allocs) live_allocs->prev = box;
live_allocs = box;
LOG(rust_get_current_task(), box,
/*LOG(rust_get_current_task(), box,
"@malloc()=%p with td %p, size %lu==%lu+%lu, "
"align %lu, prev %p, next %p\n",
box, td, total_size, sizeof(rust_opaque_box), body_size,
td->align, box->prev, box->next);
td->align, box->prev, box->next);*/
return box;
}
@ -50,9 +50,9 @@ rust_opaque_box *boxed_region::realloc(rust_opaque_box *box,
if (new_box->next) new_box->next->prev = new_box;
if (live_allocs == box) live_allocs = new_box;
LOG(rust_get_current_task(), box,
/*LOG(rust_get_current_task(), box,
"@realloc()=%p with orig=%p, size %lu==%lu+%lu",
new_box, box, total_size, sizeof(rust_opaque_box), new_size);
new_box, box, total_size, sizeof(rust_opaque_box), new_size);*/
return new_box;
}
@ -74,15 +74,15 @@ void boxed_region::free(rust_opaque_box *box) {
// double frees (kind of).
assert(box->td != NULL);
LOG(rust_get_current_task(), box,
/*LOG(rust_get_current_task(), box,
"@free(%p) with td %p, prev %p, next %p\n",
box, box->td, box->prev, box->next);
box, box->td, box->prev, box->next);*/
if (box->prev) box->prev->next = box->next;
if (box->next) box->next->prev = box->prev;
if (live_allocs == box) live_allocs = box->next;
if (env->poison_on_free) {
if (poison_on_free) {
memset(box_body(box), 0xab, box->td->size);
}

View File

@ -24,7 +24,7 @@ struct rust_env;
* a type descr which describes the payload (what follows the header). */
class boxed_region {
private:
rust_env *env;
bool poison_on_free;
memory_region *backing_region;
rust_opaque_box *live_allocs;
@ -41,8 +41,8 @@ private:
boxed_region& operator=(const boxed_region& rhs);
public:
boxed_region(rust_env *e, memory_region *br)
: env(e)
boxed_region(memory_region *br, bool poison_on_free)
: poison_on_free(poison_on_free)
, backing_region(br)
, live_allocs(NULL)
{}

View File

@ -11,7 +11,6 @@
#include "sync/sync.h"
#include "memory_region.h"
#include "rust_env.h"
#if RUSTRT_TRACK_ALLOCATIONS >= 3
#include <execinfo.h>
@ -35,15 +34,19 @@ void *memory_region::get_data(alloc_header *ptr) {
return (void*)((char *)ptr + HEADER_SIZE);
}
memory_region::memory_region(rust_env *env, bool synchronized) :
_env(env), _parent(NULL), _live_allocations(0),
_detailed_leaks(env->detailed_leaks),
memory_region::memory_region(bool synchronized,
bool detailed_leaks,
bool poison_on_free) :
_parent(NULL), _live_allocations(0),
_detailed_leaks(detailed_leaks),
_poison_on_free(poison_on_free),
_synchronized(synchronized) {
}
memory_region::memory_region(memory_region *parent) :
_env(parent->_env), _parent(parent), _live_allocations(0),
_parent(parent), _live_allocations(0),
_detailed_leaks(parent->_detailed_leaks),
_poison_on_free(parent->_poison_on_free),
_synchronized(parent->_synchronized) {
}
@ -241,7 +244,7 @@ memory_region::claim_alloc(void *mem) {
void
memory_region::maybe_poison(void *mem) {
if (!_env->poison_on_free)
if (!_poison_on_free)
return;
# if RUSTRT_TRACK_ALLOCATIONS >= 1

View File

@ -54,11 +54,11 @@ private:
inline alloc_header *get_header(void *mem);
inline void *get_data(alloc_header *);
rust_env *_env;
memory_region *_parent;
int _live_allocations;
array_list<alloc_header *> _allocation_list;
const bool _detailed_leaks;
const bool _poison_on_free;
const bool _synchronized;
lock_and_signal _lock;
@ -75,7 +75,8 @@ private:
memory_region& operator=(const memory_region& rhs);
public:
memory_region(rust_env *env, bool synchronized);
memory_region(bool synchronized,
bool detailed_leaks, bool poison_on_free);
memory_region(memory_region *parent);
void *malloc(size_t size, const char *tag);
void *realloc(void *mem, size_t size);

View File

@ -856,6 +856,63 @@ rust_initialize_global_state() {
}
}
extern "C" CDECL memory_region*
rust_new_memory_region(uintptr_t synchronized,
uintptr_t detailed_leaks,
uintptr_t poison_on_free) {
return new memory_region((bool)synchronized,
(bool)detailed_leaks,
(bool)poison_on_free);
}
extern "C" CDECL void
rust_delete_memory_region(memory_region *region) {
delete region;
}
extern "C" CDECL boxed_region*
rust_new_boxed_region(memory_region *region,
uintptr_t poison_on_free) {
return new boxed_region(region, poison_on_free);
}
extern "C" CDECL void
rust_delete_boxed_region(boxed_region *region) {
delete region;
}
extern "C" CDECL rust_opaque_box*
rust_boxed_region_malloc(boxed_region *region, type_desc *td, size_t size) {
return region->malloc(td, size);
}
extern "C" CDECL void
rust_boxed_region_free(boxed_region *region, rust_opaque_box *box) {
region->free(box);
}
typedef void *(rust_try_fn)(void*, void*);
extern "C" CDECL uintptr_t
rust_try(rust_try_fn f, void *fptr, void *env) {
try {
f(fptr, env);
} catch (uintptr_t token) {
assert(token != 0);
return token;
}
return 0;
}
extern "C" CDECL void
rust_begin_unwind(uintptr_t token) {
#ifndef __WIN32__
throw token;
#else
abort("failing on win32");
#endif
}
//
// Local Variables:
// mode: C++

View File

@ -38,7 +38,7 @@ rust_sched_loop::rust_sched_loop(rust_scheduler *sched, int id, bool killed) :
sched(sched),
log_lvl(log_debug),
min_stack_size(kernel->env->min_stack_size),
local_region(kernel->env, false),
local_region(false, kernel->env->detailed_leaks, kernel->env->poison_on_free),
// FIXME #2891: calculate a per-scheduler name.
name("main")
{

View File

@ -36,7 +36,7 @@ rust_task::rust_task(rust_sched_loop *sched_loop, rust_task_state state,
kernel(sched_loop->kernel),
name(name),
list_index(-1),
boxed(sched_loop->kernel->env, &local_region),
boxed(&local_region, sched_loop->kernel->env->poison_on_free),
local_region(&sched_loop->local_region),
unwinding(false),
total_stack_sz(0),

View File

@ -165,3 +165,14 @@ extern "C" CDECL TwoDoubles
rust_dbg_extern_identity_TwoDoubles(TwoDoubles u) {
return u;
}
// Generates increasing port numbers for network testing
extern "C" CDECL uintptr_t
rust_dbg_next_port() {
static lock_and_signal dbg_port_lock;
static uintptr_t next_port = 9600;
scoped_lock with(dbg_port_lock);
uintptr_t this_port = next_port;
next_port += 1;
return this_port;
}

View File

@ -287,7 +287,13 @@ upcall_rust_personality(int version,
s_rust_personality_args args = {(_Unwind_Reason_Code)0,
version, actions, exception_class,
ue_header, context};
rust_task *task = rust_get_current_task();
rust_task *task = rust_try_get_current_task();
if (task == NULL) {
// Assuming we're running with the new scheduler
upcall_s_rust_personality(&args);
return args.retval;
}
// The personality function is run on the stack of the
// last function that threw or landed, which is going
@ -324,8 +330,12 @@ upcall_del_stack() {
// needs to acquire the value of the stack pointer
extern "C" CDECL void
upcall_reset_stack_limit() {
rust_task *task = rust_get_current_task();
task->reset_stack_limit();
rust_task *task = rust_try_get_current_task();
if (task != NULL) {
task->reset_stack_limit();
} else {
// We must be in a newsched task
}
}
//

View File

@ -224,4 +224,12 @@ rust_uv_free_ip4_addr
rust_uv_free_ip6_addr
rust_call_nullary_fn
rust_initialize_global_state
rust_dbg_next_port
rust_new_memory_region
rust_delete_memory_region
rust_new_boxed_region
rust_delete_boxed_region
rust_boxed_region_malloc
rust_boxed_region_free
rust_try
rust_begin_unwind