rust/src/libstd/rt/task.rs

496 lines
17 KiB
Rust
Raw Normal View History

2014-01-15 11:34:05 -06:00
// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Language-level runtime services that should reasonably expected
//! to be available 'everywhere'. Local heaps, GC, unwinding,
//! local storage, and logging. Even a 'freestanding' Rust would likely want
//! to implement this.
use any::AnyOwnExt;
use cast;
use cleanup;
use clone::Clone;
use comm::Sender;
2013-12-05 20:19:06 -06:00
use io::Writer;
use iter::{Iterator, Take};
use kinds::Send;
use local_data;
use ops::Drop;
2013-06-14 01:31:19 -05:00
use option::{Option, Some, None};
use owned::Box;
use prelude::drop;
use result::{Result, Ok, Err};
use rt::Runtime;
use rt::local::Local;
use rt::local_heap::LocalHeap;
use rt::rtio::LocalIo;
use rt::unwind::Unwinder;
use str::SendStr;
use sync::arc::UnsafeArc;
use sync::atomics::{AtomicUint, SeqCst};
use task::{TaskResult, TaskOpts};
use unstable::finally::Finally;
2014-01-15 11:34:05 -06:00
/// The Task struct represents all state associated with a rust
/// task. There are at this point two primary "subtypes" of task,
/// however instead of using a subtype we just have a "task_type" field
/// in the struct. This contains a pointer to another struct that holds
/// the type-specific state.
2013-05-19 03:04:01 -05:00
pub struct Task {
2014-03-27 17:09:47 -05:00
pub heap: LocalHeap,
pub gc: GarbageCollector,
pub storage: LocalStorage,
pub unwinder: Unwinder,
pub death: Death,
pub destroyed: bool,
pub name: Option<SendStr>,
pub stdout: Option<Box<Writer:Send>>,
pub stderr: Option<Box<Writer:Send>>,
imp: Option<Box<Runtime:Send>>,
}
pub struct GarbageCollector;
pub struct LocalStorage(pub Option<local_data::Map>);
/// A handle to a blocked task. Usually this means having the Box<Task>
/// pointer by ownership, but if the task is killable, a killer can steal it
/// at any time.
pub enum BlockedTask {
Owned(Box<Task>),
Shared(UnsafeArc<AtomicUint>),
}
pub enum DeathAction {
/// Action to be done with the exit code. If set, also makes the task wait
/// until all its watched children exit before collecting the status.
2014-04-07 15:30:48 -05:00
Execute(proc(TaskResult):Send),
/// A channel to send the result of the task on when the task exits
SendMessage(Sender<TaskResult>),
}
/// Per-task state related to task death, killing, failure, etc.
pub struct Death {
2014-03-27 17:09:47 -05:00
pub on_exit: Option<DeathAction>,
}
pub struct BlockedTasks {
2014-03-27 17:09:47 -05:00
inner: UnsafeArc<AtomicUint>,
}
2013-05-19 03:04:01 -05:00
impl Task {
pub fn new() -> Task {
2013-05-19 03:04:01 -05:00
Task {
heap: LocalHeap::new(),
gc: GarbageCollector,
storage: LocalStorage(None),
unwinder: Unwinder::new(),
death: Death::new(),
destroyed: false,
2013-07-30 18:20:59 -05:00
name: None,
stdout: None,
stderr: None,
imp: None,
}
}
/// Executes the given closure as if it's running inside this task. The task
/// is consumed upon entry, and the destroyed task is returned from this
/// function in order for the caller to free. This function is guaranteed to
/// not unwind because the closure specified is run inside of a `rust_try`
/// block. (this is the only try/catch block in the world).
///
/// This function is *not* meant to be abused as a "try/catch" block. This
/// is meant to be used at the absolute boundaries of a task's lifetime, and
/// only for that purpose.
pub fn run(~self, mut f: ||) -> Box<Task> {
// Need to put ourselves into TLS, but also need access to the unwinder.
// Unsafely get a handle to the task so we can continue to use it after
// putting it in tls (so we can invoke the unwinder).
let handle: *mut Task = unsafe {
*cast::transmute::<&Box<Task>, &*mut Task>(&self)
};
Local::put(self);
// The only try/catch block in the world. Attempt to run the task's
// client-specified code and catch any failures.
let try_block = || {
// Run the task main function, then do some cleanup.
f.finally(|| {
#[allow(unused_must_use)]
fn close_outputs() {
let mut task = Local::borrow(None::<Task>);
let stderr = task.stderr.take();
let stdout = task.stdout.take();
drop(task);
match stdout { Some(mut w) => { w.flush(); }, None => {} }
match stderr { Some(mut w) => { w.flush(); }, None => {} }
}
// First, flush/destroy the user stdout/logger because these
// destructors can run arbitrary code.
close_outputs();
// First, destroy task-local storage. This may run user dtors.
//
// FIXME #8302: Dear diary. I'm so tired and confused.
// There's some interaction in rustc between the box
// annihilator and the TLS dtor by which TLS is
// accessed from annihilated box dtors *after* TLS is
// destroyed. Somehow setting TLS back to null, as the
// old runtime did, makes this work, but I don't currently
// understand how. I would expect that, if the annihilator
// reinvokes TLS while TLS is uninitialized, that
// TLS would be reinitialized but never destroyed,
// but somehow this works. I have no idea what's going
// on but this seems to make things magically work. FML.
//
// (added after initial comment) A possible interaction here is
// that the destructors for the objects in TLS themselves invoke
// TLS, or possibly some destructors for those objects being
// annihilated invoke TLS. Sadly these two operations seemed to
// be intertwined, and miraculously work for now...
let mut task = Local::borrow(None::<Task>);
let storage_map = {
let &LocalStorage(ref mut optmap) = &mut task.storage;
optmap.take()
};
drop(task);
drop(storage_map);
// Destroy remaining boxes. Also may run user dtors.
unsafe { cleanup::annihilate(); }
// Finally, just in case user dtors printed/logged during TLS
// cleanup and annihilation, re-destroy stdout and the logger.
// Note that these will have been initialized with a
// runtime-provided type which we have control over what the
// destructor does.
close_outputs();
})
};
unsafe { (*handle).unwinder.try(try_block); }
// Here we must unsafely borrow the task in order to not remove it from
// TLS. When collecting failure, we may attempt to send on a channel (or
// just run aribitrary code), so we must be sure to still have a local
// task in TLS.
unsafe {
let me: *mut Task = Local::unsafe_borrow();
(*me).death.collect_failure((*me).unwinder.result());
}
let mut me: Box<Task> = Local::take();
me.destroyed = true;
return me;
}
/// Inserts a runtime object into this task, transferring ownership to the
/// task. It is illegal to replace a previous runtime object in this task
/// with this argument.
pub fn put_runtime(&mut self, ops: Box<Runtime:Send>) {
assert!(self.imp.is_none());
self.imp = Some(ops);
}
/// Attempts to extract the runtime as a specific type. If the runtime does
/// not have the provided type, then the runtime is not removed. If the
/// runtime does have the specified type, then it is removed and returned
/// (transfer of ownership).
///
/// It is recommended to only use this method when *absolutely necessary*.
/// This function may not be available in the future.
pub fn maybe_take_runtime<T: 'static>(&mut self) -> Option<Box<T>> {
// This is a terrible, terrible function. The general idea here is to
// take the runtime, cast it to Box<Any>, check if it has the right
// type, and then re-cast it back if necessary. The method of doing
// this is pretty sketchy and involves shuffling vtables of trait
// objects around, but it gets the job done.
//
// FIXME: This function is a serious code smell and should be avoided at
// all costs. I have yet to think of a method to avoid this
// function, and I would be saddened if more usage of the function
// crops up.
unsafe {
let imp = self.imp.take_unwrap();
let &(vtable, _): &(uint, uint) = cast::transmute(&imp);
match imp.wrap().move::<T>() {
Ok(t) => Some(t),
Err(t) => {
let (_, obj): (uint, uint) = cast::transmute(t);
let obj: Box<Runtime:Send> =
cast::transmute((vtable, obj));
self.put_runtime(obj);
None
}
}
}
}
/// Spawns a sibling to this task. The newly spawned task is configured with
/// the `opts` structure and will run `f` as the body of its code.
2014-04-07 15:30:48 -05:00
pub fn spawn_sibling(mut ~self, opts: TaskOpts, f: proc():Send) {
let ops = self.imp.take_unwrap();
ops.spawn_sibling(self, opts, f)
}
/// Deschedules the current task, invoking `f` `amt` times. It is not
/// recommended to use this function directly, but rather communication
/// primitives in `std::comm` should be used.
pub fn deschedule(mut ~self, amt: uint,
f: |BlockedTask| -> Result<(), BlockedTask>) {
let ops = self.imp.take_unwrap();
ops.deschedule(amt, self, f)
}
2014-02-17 05:53:45 -06:00
/// Wakes up a previously blocked task, optionally specifying whether the
/// current task can accept a change in scheduling. This function can only
/// be called on tasks that were previously blocked in `deschedule`.
pub fn reawaken(mut ~self) {
let ops = self.imp.take_unwrap();
ops.reawaken(self);
}
/// Yields control of this task to another task. This function will
/// eventually return, but possibly not immediately. This is used as an
/// opportunity to allow other tasks a chance to run.
pub fn yield_now(mut ~self) {
let ops = self.imp.take_unwrap();
ops.yield_now(self);
}
/// Similar to `yield_now`, except that this function may immediately return
/// without yielding (depending on what the runtime decides to do).
pub fn maybe_yield(mut ~self) {
let ops = self.imp.take_unwrap();
ops.maybe_yield(self);
}
/// Acquires a handle to the I/O factory that this task contains, normally
/// stored in the task's runtime. This factory may not always be available,
/// which is why the return type is `Option`
pub fn local_io<'a>(&'a mut self) -> Option<LocalIo<'a>> {
self.imp.get_mut_ref().local_io()
}
/// Returns the stack bounds for this task in (lo, hi) format. The stack
/// bounds may not be known for all tasks, so the return value may be
/// `None`.
pub fn stack_bounds(&self) -> (uint, uint) {
self.imp.get_ref().stack_bounds()
}
/// Returns whether it is legal for this task to block the OS thread that it
/// is running on.
pub fn can_block(&self) -> bool {
self.imp.get_ref().can_block()
}
}
2013-05-19 03:04:01 -05:00
impl Drop for Task {
2013-09-16 20:18:07 -05:00
fn drop(&mut self) {
rtdebug!("called drop for a task: {}", self as *mut Task as uint);
rtassert!(self.destroyed);
}
}
impl Iterator<BlockedTask> for BlockedTasks {
fn next(&mut self) -> Option<BlockedTask> {
Some(Shared(self.inner.clone()))
}
}
impl BlockedTask {
/// Returns Some if the task was successfully woken; None if already killed.
pub fn wake(self) -> Option<Box<Task>> {
match self {
Owned(task) => Some(task),
Shared(arc) => unsafe {
match (*arc.get()).swap(0, SeqCst) {
0 => None,
n => Some(cast::transmute(n)),
}
}
}
}
// This assertion has two flavours because the wake involves an atomic op.
// In the faster version, destructors will fail dramatically instead.
#[cfg(not(test))] pub fn trash(self) { }
#[cfg(test)] pub fn trash(self) { assert!(self.wake().is_none()); }
/// Create a blocked task, unless the task was already killed.
pub fn block(task: Box<Task>) -> BlockedTask {
Owned(task)
}
/// Converts one blocked task handle to a list of many handles to the same.
pub fn make_selectable(self, num_handles: uint) -> Take<BlockedTasks> {
let arc = match self {
Owned(task) => {
let flag = unsafe { AtomicUint::new(cast::transmute(task)) };
UnsafeArc::new(flag)
}
Shared(arc) => arc.clone(),
};
BlockedTasks{ inner: arc }.take(num_handles)
}
/// Convert to an unsafe uint value. Useful for storing in a pipe's state
/// flag.
#[inline]
pub unsafe fn cast_to_uint(self) -> uint {
match self {
Owned(task) => {
let blocked_task_ptr: uint = cast::transmute(task);
rtassert!(blocked_task_ptr & 0x1 == 0);
blocked_task_ptr
}
Shared(arc) => {
2014-04-25 03:08:02 -05:00
let blocked_task_ptr: uint = cast::transmute(box arc);
rtassert!(blocked_task_ptr & 0x1 == 0);
blocked_task_ptr | 0x1
}
}
}
/// Convert from an unsafe uint value. Useful for retrieving a pipe's state
/// flag.
#[inline]
pub unsafe fn cast_from_uint(blocked_task_ptr: uint) -> BlockedTask {
if blocked_task_ptr & 0x1 == 0 {
Owned(cast::transmute(blocked_task_ptr))
} else {
let ptr: Box<UnsafeArc<AtomicUint>> =
cast::transmute(blocked_task_ptr & !1);
Shared(*ptr)
}
}
}
impl Death {
pub fn new() -> Death {
Death { on_exit: None, }
}
/// Collect failure exit codes from children and propagate them to a parent.
pub fn collect_failure(&mut self, result: TaskResult) {
match self.on_exit.take() {
Some(Execute(f)) => f(result),
std: Make std::comm return types consistent There are currently a number of return values from the std::comm methods, not all of which are necessarily completely expressive: Sender::try_send(t: T) -> bool This method currently doesn't transmit back the data `t` if the send fails due to the other end having disconnected. Additionally, this shares the name of the synchronous try_send method, but it differs in semantics in that it only has one failure case, not two (the buffer can never be full). SyncSender::try_send(t: T) -> TrySendResult<T> This method accurately conveys all possible information, but it uses a custom type to the std::comm module with no convenience methods on it. Additionally, if you want to inspect the result you're forced to import something from `std::comm`. SyncSender::send_opt(t: T) -> Option<T> This method uses Some(T) as an "error value" and None as a "success value", but almost all other uses of Option<T> have Some/None the other way Receiver::try_recv(t: T) -> TryRecvResult<T> Similarly to the synchronous try_send, this custom return type is lacking in terms of usability (no convenience methods). With this number of drawbacks in mind, I believed it was time to re-work the return types of these methods. The new API for the comm module is: Sender::send(t: T) -> () Sender::send_opt(t: T) -> Result<(), T> SyncSender::send(t: T) -> () SyncSender::send_opt(t: T) -> Result<(), T> SyncSender::try_send(t: T) -> Result<(), TrySendError<T>> Receiver::recv() -> T Receiver::recv_opt() -> Result<T, ()> Receiver::try_recv() -> Result<T, TryRecvError> The notable changes made are: * Sender::try_send => Sender::send_opt. This renaming brings the semantics in line with the SyncSender::send_opt method. An asychronous send only has one failure case, unlike the synchronous try_send method which has two failure cases (full/disconnected). * Sender::send_opt returns the data back to the caller if the send is guaranteed to fail. This method previously returned `bool`, but then it was unable to retrieve the data if the data was guaranteed to fail to send. There is still a race such that when `Ok(())` is returned the data could still fail to be received, but that's inherent to an asynchronous channel. * Result is now the basis of all return values. This not only adds lots of convenience methods to all return values for free, but it also means that you can inspect the return values with no extra imports (Ok/Err are in the prelude). Additionally, it's now self documenting when something failed or not because the return value has "Err" in the name. Things I'm a little uneasy about: * The methods send_opt and recv_opt are not returning options, but rather results. I felt more strongly that Option was the wrong return type than the _opt prefix was wrong, and I coudn't think of a much better name for these methods. One possible way to think about them is to read the _opt suffix as "optionally". * Result<T, ()> is often better expressed as Option<T>. This is only applicable to the recv_opt() method, but I thought it would be more consistent for everything to return Result rather than one method returning an Option. Despite my two reasons to feel uneasy, I feel much better about the consistency in return values at this point, and I think the only real open question is if there's a better suffix for {send,recv}_opt. Closes #11527
2014-04-10 12:53:49 -05:00
Some(SendMessage(ch)) => { let _ = ch.send_opt(result); }
None => {}
}
}
}
impl Drop for Death {
fn drop(&mut self) {
// make this type noncopyable
}
}
#[cfg(test)]
mod test {
use super::*;
2013-12-05 20:19:06 -06:00
use prelude::*;
use task;
#[test]
fn local_heap() {
let a = @5;
let b = a;
assert!(*a == 5);
assert!(*b == 5);
}
#[test]
fn tls() {
use local_data;
local_data_key!(key: @~str)
2014-04-15 20:17:48 -05:00
local_data::set(key, @"data".to_owned());
assert!(*local_data::get(key, |k| k.map(|k| *k)).unwrap() == "data".to_owned());
local_data_key!(key2: @~str)
2014-04-15 20:17:48 -05:00
local_data::set(key2, @"data".to_owned());
assert!(*local_data::get(key2, |k| k.map(|k| *k)).unwrap() == "data".to_owned());
}
#[test]
fn unwind() {
let result = task::try(proc()());
rtdebug!("trying first assert");
assert!(result.is_ok());
let result = task::try::<()>(proc() fail!());
rtdebug!("trying second assert");
assert!(result.is_err());
}
#[test]
fn rng() {
use rand::{StdRng, Rng};
let mut r = StdRng::new().ok().unwrap();
let _ = r.next_u32();
}
#[test]
fn logging() {
info!("here i am. logging in a newsched task");
}
#[test]
fn comm_stream() {
let (tx, rx) = channel();
tx.send(10);
assert!(rx.recv() == 10);
}
2013-06-14 01:31:19 -05:00
2013-06-20 20:26:56 -05:00
#[test]
fn comm_shared_chan() {
let (tx, rx) = channel();
tx.send(10);
assert!(rx.recv() == 10);
2013-06-20 20:26:56 -05:00
}
#[test]
fn heap_cycles() {
use cell::RefCell;
use option::{Option, Some, None};
struct List {
next: Option<@RefCell<List>>,
}
let a = @RefCell::new(List { next: None });
let b = @RefCell::new(List { next: Some(a) });
{
let mut a = a.borrow_mut();
a.next = Some(b);
}
}
#[test]
#[should_fail]
fn test_begin_unwind() {
use rt::unwind::begin_unwind;
begin_unwind("cause", file!(), line!())
}
// Task blocking tests
#[test]
fn block_and_wake() {
2014-04-25 03:08:02 -05:00
let task = box Task::new();
let mut task = BlockedTask::block(task).wake().unwrap();
task.destroyed = true;
}
2013-05-08 14:26:34 -05:00
}