// Copyright 2012 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // // Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. //! Utilities that leverage libuv's `uv_timer_*` API use uv; use uv::iotask; use uv::iotask::IoTask; use core::either; use core::libc; use core::libc::c_void; use core::cast::transmute; use core::comm::{stream, Chan, SharedChan, Port, select2i}; use core::prelude::*; use core::ptr; /** * Wait for timeout period then send provided value over a channel * * This call returns immediately. Useful as the building block for a number * of higher-level timer functions. * * Is not guaranteed to wait for exactly the specified time, but will wait * for *at least* that period of time. * * # Arguments * * * `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on * * msecs - a timeout period, in milliseconds, to wait * * ch - a channel of type T to send a `val` on * * val - a value of type T to send over the provided `ch` */ pub fn delayed_send(iotask: &IoTask, msecs: uint, ch: &Chan, val: T) { unsafe { let (timer_done_po, timer_done_ch) = stream::<()>(); let timer_done_ch = SharedChan(timer_done_ch); let timer = uv::ll::timer_t(); let timer_ptr = ptr::addr_of(&timer); do iotask::interact(iotask) |loop_ptr| { unsafe { let init_result = uv::ll::timer_init(loop_ptr, timer_ptr); if (init_result == 0i32) { let start_result = uv::ll::timer_start( timer_ptr, delayed_send_cb, msecs, 0u); if (start_result == 0i32) { // Note: putting the channel into a ~ // to cast to *c_void let timer_done_ch_clone = ~timer_done_ch.clone(); let timer_done_ch_ptr = transmute::< ~SharedChan<()>, *c_void>( timer_done_ch_clone); uv::ll::set_data_for_uv_handle( timer_ptr, timer_done_ch_ptr); } else { let error_msg = uv::ll::get_last_err_info( loop_ptr); fail!(~"timer::delayed_send() start failed: " + error_msg); } } else { let error_msg = uv::ll::get_last_err_info(loop_ptr); fail!(~"timer::delayed_send() init failed: " + error_msg); } } }; // delayed_send_cb has been processed by libuv timer_done_po.recv(); // notify the caller immediately ch.send(val); // uv_close for this timer has been processed timer_done_po.recv(); }; } /** * Blocks the current task for (at least) the specified time period. * * Is not guaranteed to sleep for exactly the specified time, but will sleep * for *at least* that period of time. * * # Arguments * * * `iotask` - a `uv::iotask` that the tcp request will run on * * msecs - an amount of time, in milliseconds, for the current task to block */ pub fn sleep(iotask: &IoTask, msecs: uint) { let (exit_po, exit_ch) = stream::<()>(); delayed_send(iotask, msecs, &exit_ch, ()); exit_po.recv(); } /** * Receive on a port for (up to) a specified time, then return an `Option` * * This call will block to receive on the provided port for up to the * specified timeout. Depending on whether the provided port receives in that * time period, `recv_timeout` will return an `Option` representing the * result. * * # Arguments * * * `iotask' - `uv::iotask` that the tcp request will run on * * msecs - an mount of time, in milliseconds, to wait to receive * * wait_port - a `core::comm::port` to receive on * * # Returns * * An `Option` representing the outcome of the call. If the call `recv`'d * on the provided port in the allotted timeout period, then the result will * be a `Some(T)`. If not, then `None` will be returned. */ pub fn recv_timeout(iotask: &IoTask, msecs: uint, wait_po: &Port) -> Option { let (timeout_po, timeout_ch) = stream::<()>(); delayed_send(iotask, msecs, &timeout_ch, ()); // FIXME: This could be written clearer (#2618) either::either( |_| { None }, |_| { Some(wait_po.recv()) }, &select2i(&timeout_po, wait_po) ) } // INTERNAL API extern fn delayed_send_cb(handle: *uv::ll::uv_timer_t, status: libc::c_int) { unsafe { debug!( "delayed_send_cb handle %? status %?", handle, status); // Faking a borrowed pointer to our ~SharedChan let timer_done_ch_ptr: &*c_void = &uv::ll::get_data_for_uv_handle( handle); let timer_done_ch_ptr = transmute::<&*c_void, &~SharedChan<()>>( timer_done_ch_ptr); let stop_result = uv::ll::timer_stop(handle); if (stop_result == 0i32) { timer_done_ch_ptr.send(()); uv::ll::close(handle, delayed_send_close_cb); } else { let loop_ptr = uv::ll::get_loop_for_uv_handle(handle); let error_msg = uv::ll::get_last_err_info(loop_ptr); fail!(~"timer::sleep() init failed: "+error_msg); } } } extern fn delayed_send_close_cb(handle: *uv::ll::uv_timer_t) { unsafe { debug!("delayed_send_close_cb handle %?", handle); let timer_done_ch_ptr = uv::ll::get_data_for_uv_handle(handle); let timer_done_ch = transmute::<*c_void, ~SharedChan<()>>( timer_done_ch_ptr); timer_done_ch.send(()); } } #[cfg(test)] mod test { use core::prelude::*; use timer::*; use uv; use core::iter; use core::rand::RngUtil; use core::rand; use core::task; use core::pipes::{stream, SharedChan}; #[test] pub fn test_gl_timer_simple_sleep_test() { let hl_loop = &uv::global_loop::get(); sleep(hl_loop, 1u); } #[test] pub fn test_gl_timer_sleep_stress1() { let hl_loop = &uv::global_loop::get(); for iter::repeat(50u) { sleep(hl_loop, 1u); } } #[test] pub fn test_gl_timer_sleep_stress2() { let (po, ch) = stream(); let ch = SharedChan(ch); let hl_loop = &uv::global_loop::get(); let repeat = 20u; let spec = { ~[(1u, 20u), (10u, 10u), (20u, 2u)] }; for iter::repeat(repeat) { let ch = ch.clone(); for spec.each |spec| { let (times, maxms) = *spec; let ch = ch.clone(); let hl_loop_clone = hl_loop.clone(); do task::spawn { use core::rand::*; let rng = Rng(); for iter::repeat(times) { sleep(&hl_loop_clone, rng.next() as uint % maxms); } ch.send(()); } } } for iter::repeat(repeat * spec.len()) { po.recv() } } // Because valgrind serializes multithreaded programs it can // make timing-sensitive tests fail in wierd ways. In these // next test we run them many times and expect them to pass // the majority of tries. #[test] #[cfg(ignore)] pub fn test_gl_timer_recv_timeout_before_time_passes() { let times = 100; let mut successes = 0; let mut failures = 0; let hl_loop = uv::global_loop::get(); for iter::repeat(times as uint) { task::yield(); let expected = rand::rng().gen_str(16u); let (test_po, test_ch) = stream::<~str>(); do task::spawn() { delayed_send(hl_loop, 1u, &test_ch, expected); }; match recv_timeout(hl_loop, 10u, &test_po) { Some(val) => { assert!(val == expected); successes += 1; } _ => failures += 1 }; } assert!(successes > times / 2); } #[test] pub fn test_gl_timer_recv_timeout_after_time_passes() { let times = 100; let mut successes = 0; let mut failures = 0; let hl_loop = uv::global_loop::get(); for iter::repeat(times as uint) { let expected = rand::Rng().gen_str(16u); let (test_po, test_ch) = stream::<~str>(); let hl_loop_clone = hl_loop.clone(); do task::spawn() { delayed_send(&hl_loop_clone, 50u, &test_ch, expected); }; match recv_timeout(&hl_loop, 1u, &test_po) { None => successes += 1, _ => failures += 1 }; } assert!(successes > times / 2); } }