rust/src/libstd/uv_global_loop.rs

237 lines
7.7 KiB
Rust
Raw Normal View History

// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! A process-wide libuv event loop for library use.
2012-09-04 13:23:53 -05:00
use ll = uv_ll;
use iotask = uv_iotask;
use get_gl = get;
use uv_iotask::{IoTask, spawn_iotask};
use core::either::{Left, Right};
use core::libc;
use core::pipes::{Port, Chan, SharedChan, select2i};
use core::private::global::{global_data_clone_create,
global_data_clone};
use core::private::weak_task::weaken_task;
use core::str;
use core::task::{task, SingleThreaded, spawn};
use core::task;
use core::vec;
use core::clone::Clone;
use core::option::{Some, None};
/**
* Race-free helper to get access to a global task where a libuv
* loop is running.
*
* Use `uv::hl::interact` to do operations against the global
* loop that this function returns.
*
* # Return
*
* * A `hl::high_level_loop` that encapsulates communication with the global
* loop.
*/
pub fn get() -> IoTask {
2012-08-01 19:30:05 -05:00
return get_monitor_task_gl();
}
#[doc(hidden)]
fn get_monitor_task_gl() -> IoTask {
2012-05-24 23:38:48 -05:00
type MonChan = Chan<IoTask>;
2012-05-24 23:38:48 -05:00
struct GlobalIoTask(IoTask);
2012-05-24 23:38:48 -05:00
impl GlobalIoTask: Clone {
fn clone(&self) -> GlobalIoTask {
GlobalIoTask((**self).clone())
}
}
2012-05-24 23:38:48 -05:00
fn key(_: GlobalIoTask) { }
match unsafe { global_data_clone(key) } {
Some(GlobalIoTask(iotask)) => iotask,
None => {
let iotask: IoTask = spawn_loop();
let mut installed = false;
let final_iotask = unsafe {
do global_data_clone_create(key) {
installed = true;
~GlobalIoTask(iotask.clone())
}
};
if installed {
do task().unlinked().spawn() {
unsafe {
debug!("global monitor task starting");
// As a weak task the runtime will notify us
// when to exit
do weaken_task |weak_exit_po| {
debug!("global monitor task is weak");
weak_exit_po.recv();
iotask::exit(&iotask);
debug!("global monitor task is unweak");
};
debug!("global monitor task exiting");
}
}
} else {
iotask::exit(&iotask);
2012-05-24 23:38:48 -05:00
}
match final_iotask {
GlobalIoTask(iotask) => iotask
}
}
}
}
2012-08-29 19:41:38 -05:00
fn spawn_loop() -> IoTask {
let builder = do task().add_wrapper |task_body| {
fn~(move task_body) {
// The I/O loop task also needs to be weak so it doesn't keep
// the runtime alive
unsafe {
do weaken_task |_| {
debug!("global libuv task is now weak");
task_body();
// We don't wait for the exit message on weak_exit_po
// because the monitor task will tell the uv loop when to
// exit
debug!("global libuv task is leaving weakened state");
}
}
}
};
let builder = builder.unlinked();
2012-09-11 19:17:54 -05:00
spawn_iotask(move builder)
}
#[cfg(test)]
mod test {
use core::prelude::*;
use uv::iotask;
use uv::ll;
use uv_global_loop::*;
use core::iter;
use core::libc;
use core::ptr;
2012-12-28 14:46:08 -06:00
use core::task;
use core::cast::transmute;
use core::libc::c_void;
use core::pipes::{stream, SharedChan, Chan};
extern fn simple_timer_close_cb(timer_ptr: *ll::uv_timer_t) {
unsafe {
let exit_ch_ptr = ll::get_data_for_uv_handle(
timer_ptr as *libc::c_void);
let exit_ch = transmute::<*c_void, ~Chan<bool>>(exit_ch_ptr);
exit_ch.send(true);
log(debug,
fmt!("EXIT_CH_PTR simple_timer_close_cb exit_ch_ptr: %?",
exit_ch_ptr));
}
}
2012-07-03 18:32:02 -05:00
extern fn simple_timer_cb(timer_ptr: *ll::uv_timer_t,
_status: libc::c_int) {
unsafe {
log(debug, ~"in simple timer cb");
ll::timer_stop(timer_ptr);
let hl_loop = &get_gl();
do iotask::interact(hl_loop) |_loop_ptr| {
log(debug, ~"closing timer");
unsafe {
ll::close(timer_ptr, simple_timer_close_cb);
}
log(debug, ~"about to deref exit_ch_ptr");
log(debug, ~"after msg sent on deref'd exit_ch");
};
log(debug, ~"exiting simple timer cb");
}
}
fn impl_uv_hl_simple_timer(iotask: &IoTask) {
unsafe {
let (exit_po, exit_ch) = stream::<bool>();
let exit_ch_ptr: *libc::c_void = transmute(~exit_ch);
log(debug, fmt!("EXIT_CH_PTR newly created exit_ch_ptr: %?",
exit_ch_ptr));
let timer_handle = ll::timer_t();
let timer_ptr = ptr::addr_of(&timer_handle);
do iotask::interact(iotask) |loop_ptr| {
unsafe {
log(debug, ~"user code inside interact loop!!!");
let init_status = ll::timer_init(loop_ptr, timer_ptr);
if(init_status == 0i32) {
ll::set_data_for_uv_handle(
timer_ptr as *libc::c_void,
exit_ch_ptr);
let start_status = ll::timer_start(timer_ptr,
simple_timer_cb,
1u, 0u);
if(start_status != 0i32) {
die!(~"failure on ll::timer_start()");
}
}
else {
die!(~"failure on ll::timer_init()");
}
}
};
exit_po.recv();
log(debug,
~"global_loop timer test: msg recv on exit_po, done..");
}
}
#[test]
fn test_gl_uv_global_loop_high_level_global_timer() {
let hl_loop = &get_gl();
let (exit_po, exit_ch) = stream::<()>();
2012-08-15 16:10:46 -05:00
task::spawn_sched(task::ManualThreads(1u), || {
let hl_loop = &get_gl();
impl_uv_hl_simple_timer(hl_loop);
exit_ch.send(());
});
impl_uv_hl_simple_timer(hl_loop);
exit_po.recv();
}
// keeping this test ignored until some kind of stress-test-harness
// is set up for the build bots
#[test]
#[ignore]
fn test_stress_gl_uv_global_loop_high_level_global_timer() {
let (exit_po, exit_ch) = stream::<()>();
let exit_ch = SharedChan(exit_ch);
let cycles = 5000u;
for iter::repeat(cycles) {
let exit_ch_clone = exit_ch.clone();
2012-08-15 16:10:46 -05:00
task::spawn_sched(task::ManualThreads(1u), || {
let hl_loop = &get_gl();
impl_uv_hl_simple_timer(hl_loop);
exit_ch_clone.send(());
});
};
for iter::repeat(cycles) {
exit_po.recv();
};
log(debug, ~"test_stress_gl_uv_global_loop_high_level_global_timer"+
~" exiting sucessfully!");
}
}