diff --git a/src/libcore/logging.rs b/src/libcore/logging.rs index 70195afb20a..4308d22548f 100644 --- a/src/libcore/logging.rs +++ b/src/libcore/logging.rs @@ -19,6 +19,7 @@ use repr; use vec; use cast; +use str; /// Turns on logging to stdout globally pub fn console_on() { @@ -57,7 +58,7 @@ pub fn log_type(level: u32, object: &T) { } _ => { // XXX: Bad allocation - let msg = bytes.to_str(); + let msg = str::from_bytes(bytes); newsched_log_str(msg); } } diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs index ce3fb71ef2c..b2ba6d7d3c4 100644 --- a/src/libcore/rt/mod.rs +++ b/src/libcore/rt/mod.rs @@ -18,7 +18,7 @@ mod sched; /// Thread-local access to the current Scheduler -mod local_sched; +pub mod local_sched; /// Synchronous I/O #[path = "io/mod.rs"] @@ -68,6 +68,10 @@ /// Reference counting pub mod rc; +/// A simple single-threaded channel type for passing buffered data between +/// scheduler and task context +pub mod tube; + /// Set up a default runtime configuration, given compiler-supplied arguments. /// /// This is invoked by the `start` _language item_ (unstable::lang) to diff --git a/src/libcore/rt/tube.rs b/src/libcore/rt/tube.rs new file mode 100644 index 00000000000..ef376199fcb --- /dev/null +++ b/src/libcore/rt/tube.rs @@ -0,0 +1,182 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! A very simple unsynchronized channel type for sending buffered data from +//! scheduler context to task context. +//! +//! XXX: This would be safer to use if split into two types like Port/Chan + +use option::*; +use clone::Clone; +use super::rc::RC; +use rt::sched::Task; +use rt::{context, TaskContext, SchedulerContext}; +use rt::local_sched; + +struct TubeState { + blocked_task: Option<~Task>, + buf: ~[T] +} + +pub struct Tube { + p: RC> +} + +impl Tube { + pub fn new() -> Tube { + Tube { + p: RC::new(TubeState { + blocked_task: None, + buf: ~[] + }) + } + } + + pub fn send(&mut self, val: T) { + rtdebug!("tube send"); + assert!(context() == SchedulerContext); + + unsafe { + let state = self.p.unsafe_borrow_mut(); + (*state).buf.push(val); + + if (*state).blocked_task.is_some() { + // There's a waiting task. Wake it up + rtdebug!("waking blocked tube"); + let task = (*state).blocked_task.swap_unwrap(); + let sched = local_sched::take(); + sched.resume_task_immediately(task); + } + } + } + + pub fn recv(&mut self) -> T { + assert!(context() == TaskContext); + + unsafe { + let state = self.p.unsafe_borrow_mut(); + if !(*state).buf.is_empty() { + return (*state).buf.shift(); + } else { + // Block and wait for the next message + rtdebug!("blocking on tube recv"); + assert!(self.p.refcount() > 1); // There better be somebody to wake us up + assert!((*state).blocked_task.is_none()); + let sched = local_sched::take(); + do sched.deschedule_running_task_and_then |task| { + (*state).blocked_task = Some(task); + } + rtdebug!("waking after tube recv"); + let buf = &mut (*state).buf; + assert!(!buf.is_empty()); + return buf.shift(); + } + } + } +} + +impl Clone for Tube { + fn clone(&self) -> Tube { + Tube { p: self.p.clone() } + } +} + +#[cfg(test)] +mod test { + use int; + use cell::Cell; + use rt::local_sched; + use rt::test::*; + use rt::rtio::EventLoop; + use super::*; + + #[test] + fn simple_test() { + do run_in_newsched_task { + let mut tube: Tube = Tube::new(); + let tube_clone = tube.clone(); + let tube_clone_cell = Cell(tube_clone); + let sched = local_sched::take(); + do sched.deschedule_running_task_and_then |task| { + let mut tube_clone = tube_clone_cell.take(); + tube_clone.send(1); + let sched = local_sched::take(); + sched.resume_task_immediately(task); + } + + assert!(tube.recv() == 1); + } + } + + #[test] + fn blocking_test() { + do run_in_newsched_task { + let mut tube: Tube = Tube::new(); + let tube_clone = tube.clone(); + let tube_clone = Cell(Cell(Cell(tube_clone))); + let sched = local_sched::take(); + do sched.deschedule_running_task_and_then |task| { + let tube_clone = tube_clone.take(); + do local_sched::borrow |sched| { + let tube_clone = tube_clone.take(); + do sched.event_loop.callback { + let mut tube_clone = tube_clone.take(); + // The task should be blocked on this now and + // sending will wake it up. + tube_clone.send(1); + } + } + let sched = local_sched::take(); + sched.resume_task_immediately(task); + } + + assert!(tube.recv() == 1); + } + } + + #[test] + fn many_blocking_test() { + static MAX: int = 100; + + do run_in_newsched_task { + let mut tube: Tube = Tube::new(); + let tube_clone = tube.clone(); + let tube_clone = Cell(tube_clone); + let sched = local_sched::take(); + do sched.deschedule_running_task_and_then |task| { + callback_send(tube_clone.take(), 0); + + fn callback_send(tube: Tube, i: int) { + if i == 100 { return; } + + let tube = Cell(Cell(tube)); + do local_sched::borrow |sched| { + let tube = tube.take(); + do sched.event_loop.callback { + let mut tube = tube.take(); + // The task should be blocked on this now and + // sending will wake it up. + tube.send(i); + callback_send(tube, i + 1); + } + } + } + + let sched = local_sched::take(); + sched.resume_task_immediately(task); + } + + for int::range(0, MAX) |i| { + let j = tube.recv(); + assert!(j == i); + } + } + } +} diff --git a/src/libcore/sys.rs b/src/libcore/sys.rs index a27b6fe615f..50a739ec67d 100644 --- a/src/libcore/sys.rs +++ b/src/libcore/sys.rs @@ -202,10 +202,12 @@ fn fail_with(cause: &'static str, file: &'static str, line: uint) -> ! { // FIXME #4427: Temporary until rt::rt_fail_ goes away pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! { - use rt::{context, OldTaskContext}; - use rt::local_services::unsafe_borrow_local_services; + use option::Option; + use rt::{context, OldTaskContext, TaskContext}; + use rt::local_services::{unsafe_borrow_local_services, Unwinder}; - match context() { + let context = context(); + match context { OldTaskContext => { unsafe { gc::cleanup_stack_for_failure(); @@ -214,11 +216,26 @@ pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! { } } _ => { - // XXX: Need to print the failure message - gc::cleanup_stack_for_failure(); unsafe { + // XXX: Bad re-allocations. fail! needs some refactoring + let msg = str::raw::from_c_str(msg); + let file = str::raw::from_c_str(file); + + let outmsg = fmt!("%s at line %i of file %s", msg, line as int, file); + + // XXX: Logging doesn't work correctly in non-task context because it + // invokes the local heap + if context == TaskContext { + error!(outmsg); + } else { + rtdebug!("%s", outmsg); + } + + gc::cleanup_stack_for_failure(); + let local_services = unsafe_borrow_local_services(); - match (*local_services).unwinder { + let unwinder: &mut Option = &mut (*local_services).unwinder; + match *unwinder { Some(ref mut unwinder) => unwinder.begin_unwind(), None => abort!("failure without unwinder. aborting process") }