rust/src/libstd/timer.rs

258 lines
8.2 KiB
Rust
Raw Normal View History

//! Utilities that leverage libuv's `uv_timer_*` API
// tjc: forbid deprecated modes again after snap
2012-09-04 13:23:53 -05:00
use uv = uv;
use uv::iotask;
use iotask::IoTask;
use comm = core::comm;
2012-08-14 16:17:27 -05:00
/**
* Wait for timeout period then send provided value over a channel
*
* This call returns immediately. Useful as the building block for a number
* of higher-level timer functions.
*
* Is not guaranteed to wait for exactly the specified time, but will wait
* for *at least* that period of time.
*
* # Arguments
*
* * `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on
* * msecs - a timeout period, in milliseconds, to wait
* * ch - a channel of type T to send a `val` on
* * val - a value of type T to send over the provided `ch`
*/
pub fn delayed_send<T: Copy Send>(iotask: IoTask,
msecs: uint, ch: comm::Chan<T>, val: T) {
unsafe {
2012-08-27 16:22:25 -05:00
let timer_done_po = core::comm::Port::<()>();
2012-10-03 16:38:01 -05:00
let timer_done_ch = core::comm::Chan(&timer_done_po);
let timer_done_ch_ptr = ptr::addr_of(&timer_done_ch);
let timer = uv::ll::timer_t();
let timer_ptr = ptr::addr_of(&timer);
do iotask::interact(iotask) |loop_ptr| unsafe {
let init_result = uv::ll::timer_init(loop_ptr, timer_ptr);
if (init_result == 0i32) {
let start_result = uv::ll::timer_start(
timer_ptr, delayed_send_cb, msecs, 0u);
if (start_result == 0i32) {
uv::ll::set_data_for_uv_handle(
timer_ptr,
timer_done_ch_ptr as *libc::c_void);
}
else {
let error_msg = uv::ll::get_last_err_info(loop_ptr);
fail ~"timer::delayed_send() start failed: " +
error_msg;
}
}
else {
let error_msg = uv::ll::get_last_err_info(loop_ptr);
fail ~"timer::delayed_send() init failed: "+error_msg;
}
};
// delayed_send_cb has been processed by libuv
2012-08-14 16:17:27 -05:00
core::comm::recv(timer_done_po);
// notify the caller immediately
2012-08-14 16:17:27 -05:00
core::comm::send(ch, copy(val));
// uv_close for this timer has been processed
2012-08-14 16:17:27 -05:00
core::comm::recv(timer_done_po);
};
}
/**
* Blocks the current task for (at least) the specified time period.
*
* Is not guaranteed to sleep for exactly the specified time, but will sleep
* for *at least* that period of time.
*
* # Arguments
*
* * `iotask` - a `uv::iotask` that the tcp request will run on
* * msecs - an amount of time, in milliseconds, for the current task to block
*/
pub fn sleep(iotask: IoTask, msecs: uint) {
2012-08-27 16:22:25 -05:00
let exit_po = core::comm::Port::<()>();
2012-10-03 16:38:01 -05:00
let exit_ch = core::comm::Chan(&exit_po);
delayed_send(iotask, msecs, exit_ch, ());
2012-08-14 16:17:27 -05:00
core::comm::recv(exit_po);
}
/**
* Receive on a port for (up to) a specified time, then return an `option<T>`
*
* This call will block to receive on the provided port for up to the
* specified timeout. Depending on whether the provided port receives in that
* time period, `recv_timeout` will return an `option<T>` representing the
* result.
*
* # Arguments
*
* * `iotask' - `uv::iotask` that the tcp request will run on
* * msecs - an mount of time, in milliseconds, to wait to receive
2012-08-14 16:17:27 -05:00
* * wait_port - a `core::comm::port<T>` to receive on
*
* # Returns
*
* An `option<T>` representing the outcome of the call. If the call `recv`'d
* on the provided port in the allotted timeout period, then the result will
* be a `some(T)`. If not, then `none` will be returned.
*/
pub fn recv_timeout<T: Copy Send>(iotask: IoTask,
2012-05-22 20:24:28 -05:00
msecs: uint,
2012-08-20 14:23:37 -05:00
wait_po: comm::Port<T>) -> Option<T> {
2012-08-27 16:22:25 -05:00
let timeout_po = comm::Port::<()>();
2012-10-03 16:38:01 -05:00
let timeout_ch = comm::Chan(&timeout_po);
delayed_send(iotask, msecs, timeout_ch, ());
// FIXME: This could be written clearer (#2618)
either::either(
2012-06-30 18:19:07 -05:00
|left_val| {
2012-08-22 19:24:52 -05:00
log(debug, fmt!("recv_time .. left_val %?",
left_val));
2012-08-20 14:23:37 -05:00
None
2012-06-30 18:19:07 -05:00
}, |right_val| {
2012-08-20 14:23:37 -05:00
Some(*right_val)
2012-08-14 16:17:27 -05:00
}, &core::comm::select2(timeout_po, wait_po)
)
}
// INTERNAL API
2012-07-03 18:32:02 -05:00
extern fn delayed_send_cb(handle: *uv::ll::uv_timer_t,
status: libc::c_int) unsafe {
2012-08-22 19:24:52 -05:00
log(debug, fmt!("delayed_send_cb handle %? status %?", handle, status));
let timer_done_ch =
2012-08-15 16:10:46 -05:00
*(uv::ll::get_data_for_uv_handle(handle) as *comm::Chan<()>);
let stop_result = uv::ll::timer_stop(handle);
if (stop_result == 0i32) {
2012-08-14 16:17:27 -05:00
core::comm::send(timer_done_ch, ());
uv::ll::close(handle, delayed_send_close_cb);
}
else {
let loop_ptr = uv::ll::get_loop_for_uv_handle(handle);
let error_msg = uv::ll::get_last_err_info(loop_ptr);
fail ~"timer::sleep() init failed: "+error_msg;
}
}
2012-07-03 18:32:02 -05:00
extern fn delayed_send_close_cb(handle: *uv::ll::uv_timer_t) unsafe {
2012-08-22 19:24:52 -05:00
log(debug, fmt!("delayed_send_close_cb handle %?", handle));
let timer_done_ch =
2012-08-15 16:10:46 -05:00
*(uv::ll::get_data_for_uv_handle(handle) as *comm::Chan<()>);
comm::send(timer_done_ch, ());
}
#[cfg(test)]
mod test {
#[legacy_exports];
#[test]
fn test_gl_timer_simple_sleep_test() {
let hl_loop = uv::global_loop::get();
sleep(hl_loop, 1u);
}
2012-04-28 16:13:25 -05:00
#[test]
fn test_gl_timer_sleep_stress1() {
let hl_loop = uv::global_loop::get();
for iter::repeat(50u) {
sleep(hl_loop, 1u);
2012-04-28 16:13:25 -05:00
}
}
#[test]
fn test_gl_timer_sleep_stress2() {
2012-08-27 16:22:25 -05:00
let po = core::comm::Port();
2012-10-03 16:38:01 -05:00
let ch = core::comm::Chan(&po);
let hl_loop = uv::global_loop::get();
2012-04-28 16:13:25 -05:00
let repeat = 20u;
2012-04-28 16:13:25 -05:00
let spec = {
~[(1u, 20u),
2012-04-28 16:13:25 -05:00
(10u, 10u),
(20u, 2u)]
2012-04-28 16:13:25 -05:00
};
for iter::repeat(repeat) {
2012-04-28 16:13:25 -05:00
2012-06-30 18:19:07 -05:00
for spec.each |spec| {
let (times, maxms) = *spec;
do task::spawn {
use rand::*;
2012-08-27 16:22:25 -05:00
let rng = Rng();
for iter::repeat(times) {
sleep(hl_loop, rng.next() as uint % maxms);
2012-04-28 16:13:25 -05:00
}
2012-08-14 16:17:27 -05:00
core::comm::send(ch, ());
2012-04-28 16:13:25 -05:00
}
}
}
for iter::repeat(repeat * spec.len()) {
2012-08-14 16:17:27 -05:00
core::comm::recv(po)
2012-04-28 16:13:25 -05:00
}
}
// Because valgrind serializes multithreaded programs it can
// make timing-sensitive tests fail in wierd ways. In these
// next test we run them many times and expect them to pass
// the majority of tries.
#[test]
#[cfg(ignore)]
fn test_gl_timer_recv_timeout_before_time_passes() {
let times = 100;
let mut successes = 0;
let mut failures = 0;
let hl_loop = uv::global_loop::get();
for iter::repeat(times as uint) {
task::yield();
let expected = rand::rng().gen_str(16u);
2012-08-14 16:17:27 -05:00
let test_po = core::comm::port::<str>();
let test_ch = core::comm::chan(test_po);
do task::spawn() {
delayed_send(hl_loop, 1u, test_ch, expected);
};
2012-08-06 14:34:08 -05:00
match recv_timeout(hl_loop, 10u, test_po) {
2012-08-03 21:59:04 -05:00
some(val) => {
assert val == expected;
successes += 1;
}
_ => failures += 1
};
}
assert successes > times / 2;
}
#[test]
fn test_gl_timer_recv_timeout_after_time_passes() {
let times = 100;
let mut successes = 0;
let mut failures = 0;
let hl_loop = uv::global_loop::get();
for iter::repeat(times as uint) {
2012-08-27 16:22:25 -05:00
let expected = rand::Rng().gen_str(16u);
let test_po = core::comm::Port::<~str>();
2012-10-03 16:38:01 -05:00
let test_ch = core::comm::Chan(&test_po);
do task::spawn() {
delayed_send(hl_loop, 50u, test_ch, expected);
};
2012-08-06 14:34:08 -05:00
match recv_timeout(hl_loop, 1u, test_po) {
2012-08-20 14:23:37 -05:00
None => successes += 1,
2012-08-03 21:59:04 -05:00
_ => failures += 1
};
}
assert successes > times / 2;
}
}