extra: Remove uv, net, timer code
This will all be reimplemented in the new runtime.
This commit is contained in:
parent
2f7d86f9a8
commit
219c1c71da
@ -36,23 +36,9 @@ use std::str::{StrSlice, OwnedStr};
|
||||
|
||||
pub use std::os;
|
||||
|
||||
pub mod uv_ll;
|
||||
|
||||
// General io and system-services modules
|
||||
|
||||
#[path = "net/mod.rs"]
|
||||
pub mod net;
|
||||
|
||||
// libuv modules
|
||||
pub mod uv;
|
||||
pub mod uv_iotask;
|
||||
pub mod uv_global_loop;
|
||||
|
||||
|
||||
// Utility modules
|
||||
|
||||
pub mod c_vec;
|
||||
pub mod timer;
|
||||
pub mod io_util;
|
||||
pub mod rc;
|
||||
|
||||
|
@ -639,7 +639,6 @@ mod test {
|
||||
use flatpipes::serial;
|
||||
use io_util::BufReader;
|
||||
use flatpipes::{BytePort, FlatChan, FlatPort};
|
||||
use net::tcp::TcpSocketBuf;
|
||||
|
||||
use std::comm;
|
||||
use std::int;
|
||||
@ -728,7 +727,8 @@ mod test {
|
||||
}
|
||||
|
||||
// FIXME #2064: Networking doesn't work on x86
|
||||
#[test]
|
||||
// XXX Broken until networking support is added back
|
||||
/*#[test]
|
||||
#[cfg(target_arch = "x86_64")]
|
||||
fn test_pod_tcp_stream() {
|
||||
fn reader_port(buf: TcpSocketBuf
|
||||
@ -745,6 +745,7 @@ mod test {
|
||||
#[test]
|
||||
#[cfg(target_arch = "x86_64")]
|
||||
fn test_serializing_tcp_stream() {
|
||||
// XXX Broken until networking support is added back
|
||||
fn reader_port(buf: TcpSocketBuf
|
||||
) -> serial::ReaderPort<int, TcpSocketBuf> {
|
||||
serial::reader_port(buf)
|
||||
@ -860,7 +861,7 @@ mod test {
|
||||
}
|
||||
|
||||
finish_port.recv();
|
||||
}
|
||||
}*/
|
||||
|
||||
// Tests that the different backends behave the same when the
|
||||
// binary streaming protocol is broken
|
||||
|
@ -1,452 +0,0 @@
|
||||
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
|
||||
// file at the top-level directory of this distribution and at
|
||||
// http://rust-lang.org/COPYRIGHT.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
||||
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
||||
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
//! Types/fns concerning Internet Protocol (IP), versions 4 & 6
|
||||
|
||||
#[allow(missing_doc)];
|
||||
|
||||
|
||||
use std::libc;
|
||||
use std::comm::{stream, SharedChan};
|
||||
use std::ptr;
|
||||
use std::result;
|
||||
use std::str;
|
||||
|
||||
use iotask = uv::iotask::IoTask;
|
||||
use interact = uv::iotask::interact;
|
||||
|
||||
use sockaddr_in = uv_ll::sockaddr_in;
|
||||
use sockaddr_in6 = uv_ll::sockaddr_in6;
|
||||
use addrinfo = uv_ll::addrinfo;
|
||||
use uv_getaddrinfo_t = uv_ll::uv_getaddrinfo_t;
|
||||
use uv_ip4_name = uv_ll::ip4_name;
|
||||
use uv_ip4_port = uv_ll::ip4_port;
|
||||
use uv_ip6_name = uv_ll::ip6_name;
|
||||
use uv_ip6_port = uv_ll::ip6_port;
|
||||
use uv_getaddrinfo = uv_ll::getaddrinfo;
|
||||
use uv_freeaddrinfo = uv_ll::freeaddrinfo;
|
||||
use create_uv_getaddrinfo_t = uv_ll::getaddrinfo_t;
|
||||
use set_data_for_req = uv_ll::set_data_for_req;
|
||||
use get_data_for_req = uv_ll::get_data_for_req;
|
||||
use ll = uv_ll;
|
||||
|
||||
/// An IP address
|
||||
#[deriving(Clone)]
|
||||
pub enum IpAddr {
|
||||
/// An IPv4 address
|
||||
Ipv4(sockaddr_in),
|
||||
Ipv6(sockaddr_in6)
|
||||
}
|
||||
|
||||
/// Human-friendly feedback on why a parse_addr attempt failed
|
||||
pub struct ParseAddrErr {
|
||||
err_msg: ~str,
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a `IpAddr` to a str
|
||||
*
|
||||
* # Arguments
|
||||
*
|
||||
* * ip - a `extra::net::ip::IpAddr`
|
||||
*/
|
||||
pub fn format_addr(ip: &IpAddr) -> ~str {
|
||||
match *ip {
|
||||
Ipv4(ref addr) => unsafe {
|
||||
let result = uv_ip4_name(addr);
|
||||
if result == ~"" {
|
||||
fail!("failed to convert inner sockaddr_in address to str")
|
||||
}
|
||||
result
|
||||
},
|
||||
Ipv6(ref addr) => unsafe {
|
||||
let result = uv_ip6_name(addr);
|
||||
if result == ~"" {
|
||||
fail!("failed to convert inner sockaddr_in address to str")
|
||||
}
|
||||
result
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the associated port
|
||||
*
|
||||
* # Arguments
|
||||
* * ip - a `extra::net::ip::IpAddr`
|
||||
*/
|
||||
pub fn get_port(ip: &IpAddr) -> uint {
|
||||
match *ip {
|
||||
Ipv4(ref addr) => unsafe {
|
||||
uv_ip4_port(addr)
|
||||
},
|
||||
Ipv6(ref addr) => unsafe {
|
||||
uv_ip6_port(addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents errors returned from `net::ip::get_addr()`
|
||||
enum IpGetAddrErr {
|
||||
GetAddrUnknownError
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts name resolution on the provided `node` string
|
||||
*
|
||||
* # Arguments
|
||||
*
|
||||
* * `node` - a string representing some host address
|
||||
* * `iotask` - a `uv::iotask` used to interact with the underlying event loop
|
||||
*
|
||||
* # Returns
|
||||
*
|
||||
* A `result<~[ip_addr], ip_get_addr_err>` instance that will contain
|
||||
* a vector of `ip_addr` results, in the case of success, or an error
|
||||
* object in the case of failure
|
||||
*/
|
||||
pub fn get_addr(node: &str, iotask: &iotask)
|
||||
-> result::Result<~[IpAddr], IpGetAddrErr> {
|
||||
let (output_po, output_ch) = stream();
|
||||
let mut output_ch = Some(SharedChan::new(output_ch));
|
||||
do str::as_buf(node) |node_ptr, len| {
|
||||
let output_ch = output_ch.take_unwrap();
|
||||
debug!("slice len %?", len);
|
||||
let handle = create_uv_getaddrinfo_t();
|
||||
let handle_ptr: *uv_getaddrinfo_t = &handle;
|
||||
let handle_data = GetAddrData {
|
||||
output_ch: output_ch.clone()
|
||||
};
|
||||
let handle_data_ptr: *GetAddrData = &handle_data;
|
||||
do interact(iotask) |loop_ptr| {
|
||||
unsafe {
|
||||
let result = uv_getaddrinfo(
|
||||
loop_ptr,
|
||||
handle_ptr,
|
||||
get_addr_cb,
|
||||
node_ptr,
|
||||
ptr::null(),
|
||||
ptr::null());
|
||||
match result {
|
||||
0i32 => {
|
||||
set_data_for_req(handle_ptr, handle_data_ptr);
|
||||
}
|
||||
_ => {
|
||||
output_ch.send(result::Err(GetAddrUnknownError));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
output_po.recv()
|
||||
}
|
||||
}
|
||||
|
||||
pub mod v4 {
|
||||
|
||||
use net::ip::{IpAddr, Ipv4, ParseAddrErr};
|
||||
use uv::ll;
|
||||
use uv_ip4_addr = uv::ll::ip4_addr;
|
||||
use uv_ip4_name = uv::ll::ip4_name;
|
||||
|
||||
use std::cast::transmute;
|
||||
use std::result;
|
||||
use std::uint;
|
||||
|
||||
/**
|
||||
* Convert a str to `ip_addr`
|
||||
*
|
||||
* # Failure
|
||||
*
|
||||
* Fails if the string is not a valid IPv4 address
|
||||
*
|
||||
* # Arguments
|
||||
*
|
||||
* * ip - a string of the format `x.x.x.x`
|
||||
*
|
||||
* # Returns
|
||||
*
|
||||
* * an `ip_addr` of the `ipv4` variant
|
||||
*/
|
||||
pub fn parse_addr(ip: &str) -> IpAddr {
|
||||
match try_parse_addr(ip) {
|
||||
result::Ok(addr) => addr,
|
||||
result::Err(ref err_data) => fail!(err_data.err_msg.clone())
|
||||
}
|
||||
}
|
||||
|
||||
// the simple, old style numberic representation of
|
||||
// ipv4
|
||||
#[deriving(Clone)]
|
||||
pub struct Ipv4Rep {
|
||||
a: u8,
|
||||
b: u8,
|
||||
c: u8,
|
||||
d: u8,
|
||||
}
|
||||
|
||||
pub trait AsUnsafeU32 {
|
||||
unsafe fn as_u32(&self) -> u32;
|
||||
}
|
||||
|
||||
impl AsUnsafeU32 for Ipv4Rep {
|
||||
// this is pretty dastardly, i know
|
||||
unsafe fn as_u32(&self) -> u32 {
|
||||
let this: &mut u32 = transmute(self);
|
||||
*this
|
||||
}
|
||||
}
|
||||
pub fn parse_to_ipv4_rep(ip: &str) -> result::Result<Ipv4Rep, ~str> {
|
||||
let parts: ~[uint] = ip.split_iter('.').transform(|s| {
|
||||
match uint::from_str(s) {
|
||||
Some(n) if n <= 255 => n,
|
||||
_ => 256
|
||||
}
|
||||
}).collect();
|
||||
if parts.len() != 4 {
|
||||
Err(fmt!("'%s' doesn't have 4 parts", ip))
|
||||
} else if parts.iter().any(|x| *x == 256u) {
|
||||
Err(fmt!("invalid octal in addr '%s'", ip))
|
||||
} else {
|
||||
Ok(Ipv4Rep {
|
||||
a: parts[0] as u8, b: parts[1] as u8,
|
||||
c: parts[2] as u8, d: parts[3] as u8,
|
||||
})
|
||||
}
|
||||
}
|
||||
pub fn try_parse_addr(ip: &str) -> result::Result<IpAddr,ParseAddrErr> {
|
||||
unsafe {
|
||||
let INADDR_NONE = ll::get_INADDR_NONE();
|
||||
let ip_rep_result = parse_to_ipv4_rep(ip);
|
||||
if result::is_err(&ip_rep_result) {
|
||||
let err_str = result::get_err(&ip_rep_result);
|
||||
return result::Err(ParseAddrErr { err_msg: err_str })
|
||||
}
|
||||
// ipv4_rep.as_u32 is unsafe :/
|
||||
let input_is_inaddr_none =
|
||||
result::get(&ip_rep_result).as_u32() == INADDR_NONE;
|
||||
|
||||
let new_addr = uv_ip4_addr(ip, 22);
|
||||
let reformatted_name = uv_ip4_name(&new_addr);
|
||||
debug!("try_parse_addr: input ip: %s reparsed ip: %s",
|
||||
ip, reformatted_name);
|
||||
let ref_ip_rep_result = parse_to_ipv4_rep(reformatted_name);
|
||||
if result::is_err(&ref_ip_rep_result) {
|
||||
let err_str = result::get_err(&ref_ip_rep_result);
|
||||
return Err(ParseAddrErr { err_msg: err_str })
|
||||
}
|
||||
|
||||
if result::get(&ref_ip_rep_result).as_u32() == INADDR_NONE &&
|
||||
!input_is_inaddr_none {
|
||||
Err(ParseAddrErr {
|
||||
err_msg: ~"uv_ip4_name produced invalid result.",
|
||||
})
|
||||
} else {
|
||||
Ok(Ipv4(new_addr))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
pub mod v6 {
|
||||
|
||||
use net::ip::{IpAddr, Ipv6, ParseAddrErr};
|
||||
use uv_ip6_addr = uv::ll::ip6_addr;
|
||||
use uv_ip6_name = uv::ll::ip6_name;
|
||||
|
||||
use std::result;
|
||||
|
||||
/**
|
||||
* Convert a str to `ip_addr`
|
||||
*
|
||||
* # Failure
|
||||
*
|
||||
* Fails if the string is not a valid IPv6 address
|
||||
*
|
||||
* # Arguments
|
||||
*
|
||||
* * ip - an ipv6 string. See RFC2460 for spec.
|
||||
*
|
||||
* # Returns
|
||||
*
|
||||
* * an `ip_addr` of the `ipv6` variant
|
||||
*/
|
||||
pub fn parse_addr(ip: &str) -> IpAddr {
|
||||
match try_parse_addr(ip) {
|
||||
result::Ok(addr) => addr,
|
||||
result::Err(err_data) => fail!(err_data.err_msg.clone())
|
||||
}
|
||||
}
|
||||
pub fn try_parse_addr(ip: &str) -> result::Result<IpAddr,ParseAddrErr> {
|
||||
unsafe {
|
||||
// need to figure out how to establish a parse failure..
|
||||
let new_addr = uv_ip6_addr(ip, 22);
|
||||
let reparsed_name = uv_ip6_name(&new_addr);
|
||||
debug!("v6::try_parse_addr ip: '%s' reparsed '%s'",
|
||||
ip, reparsed_name);
|
||||
// '::' appears to be uv_ip6_name() returns for bogus
|
||||
// parses..
|
||||
if ip != &"::" && reparsed_name == ~"::" {
|
||||
Err(ParseAddrErr { err_msg:fmt!("failed to parse '%s'", ip) })
|
||||
}
|
||||
else {
|
||||
Ok(Ipv6(new_addr))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct GetAddrData {
|
||||
output_ch: SharedChan<result::Result<~[IpAddr],IpGetAddrErr>>
|
||||
}
|
||||
|
||||
extern fn get_addr_cb(handle: *uv_getaddrinfo_t,
|
||||
status: libc::c_int,
|
||||
res: *addrinfo) {
|
||||
unsafe {
|
||||
debug!("in get_addr_cb");
|
||||
let handle_data = get_data_for_req(handle) as
|
||||
*GetAddrData;
|
||||
let output_ch = (*handle_data).output_ch.clone();
|
||||
if status == 0i32 {
|
||||
if res != (ptr::null::<addrinfo>()) {
|
||||
let mut out_vec = ~[];
|
||||
debug!("initial addrinfo: %?", res);
|
||||
let mut curr_addr = res;
|
||||
loop {
|
||||
let new_ip_addr = if ll::is_ipv4_addrinfo(curr_addr) {
|
||||
Ipv4(*ll::addrinfo_as_sockaddr_in(curr_addr))
|
||||
}
|
||||
else if ll::is_ipv6_addrinfo(curr_addr) {
|
||||
Ipv6(*ll::addrinfo_as_sockaddr_in6(curr_addr))
|
||||
}
|
||||
else {
|
||||
debug!("curr_addr is not of family AF_INET or \
|
||||
AF_INET6. Error.");
|
||||
output_ch.send(
|
||||
result::Err(GetAddrUnknownError));
|
||||
break;
|
||||
};
|
||||
out_vec.push(new_ip_addr);
|
||||
|
||||
let next_addr = ll::get_next_addrinfo(curr_addr);
|
||||
if next_addr == ptr::null::<addrinfo>() as *addrinfo {
|
||||
debug!("null next_addr encountered. no mas");
|
||||
break;
|
||||
}
|
||||
else {
|
||||
curr_addr = next_addr;
|
||||
debug!("next_addr addrinfo: %?", curr_addr);
|
||||
}
|
||||
}
|
||||
debug!("successful process addrinfo result, len: %?",
|
||||
out_vec.len());
|
||||
output_ch.send(result::Ok(out_vec));
|
||||
}
|
||||
else {
|
||||
debug!("addrinfo pointer is NULL");
|
||||
output_ch.send(
|
||||
result::Err(GetAddrUnknownError));
|
||||
}
|
||||
}
|
||||
else {
|
||||
debug!("status != 0 error in get_addr_cb");
|
||||
output_ch.send(
|
||||
result::Err(GetAddrUnknownError));
|
||||
}
|
||||
if res != (ptr::null::<addrinfo>()) {
|
||||
uv_freeaddrinfo(res);
|
||||
}
|
||||
debug!("leaving get_addr_cb");
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
use net::ip::*;
|
||||
use net::ip::v4;
|
||||
use net::ip::v6;
|
||||
use uv;
|
||||
|
||||
use std::result;
|
||||
|
||||
#[test]
|
||||
fn test_ip_ipv4_parse_and_format_ip() {
|
||||
let localhost_str = ~"127.0.0.1";
|
||||
assert!(format_addr(&v4::parse_addr(localhost_str))
|
||||
== localhost_str)
|
||||
}
|
||||
#[test]
|
||||
fn test_ip_ipv6_parse_and_format_ip() {
|
||||
let localhost_str = ~"::1";
|
||||
let format_result = format_addr(&v6::parse_addr(localhost_str));
|
||||
debug!("results: expected: '%s' actual: '%s'",
|
||||
localhost_str, format_result);
|
||||
assert_eq!(format_result, localhost_str);
|
||||
}
|
||||
#[test]
|
||||
fn test_ip_ipv4_bad_parse() {
|
||||
match v4::try_parse_addr("b4df00d") {
|
||||
result::Err(ref err_info) => {
|
||||
debug!("got error as expected %?", err_info);
|
||||
assert!(true);
|
||||
}
|
||||
result::Ok(ref addr) => {
|
||||
fail!("Expected failure, but got addr %?", addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
#[ignore(target_os="win32")]
|
||||
fn test_ip_ipv6_bad_parse() {
|
||||
match v6::try_parse_addr("::,~2234k;") {
|
||||
result::Err(ref err_info) => {
|
||||
debug!("got error as expected %?", err_info);
|
||||
assert!(true);
|
||||
}
|
||||
result::Ok(ref addr) => {
|
||||
fail!("Expected failure, but got addr %?", addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
#[ignore(reason = "valgrind says it's leaky")]
|
||||
fn test_ip_get_addr() {
|
||||
let localhost_name = ~"localhost";
|
||||
let iotask = &uv::global_loop::get();
|
||||
let ga_result = get_addr(localhost_name, iotask);
|
||||
if result::is_err(&ga_result) {
|
||||
fail!("got err result from net::ip::get_addr();")
|
||||
}
|
||||
// note really sure how to reliably test/assert
|
||||
// this.. mostly just wanting to see it work, atm.
|
||||
let results = result::unwrap(ga_result);
|
||||
debug!("test_get_addr: Number of results for %s: %?",
|
||||
localhost_name, results.len());
|
||||
for results.iter().advance |r| {
|
||||
let ipv_prefix = match *r {
|
||||
Ipv4(_) => ~"IPv4",
|
||||
Ipv6(_) => ~"IPv6"
|
||||
};
|
||||
debug!("test_get_addr: result %s: '%s'",
|
||||
ipv_prefix, format_addr(r));
|
||||
}
|
||||
// at least one result.. this is going to vary from system
|
||||
// to system, based on stuff like the contents of /etc/hosts
|
||||
assert!(!results.is_empty());
|
||||
}
|
||||
#[test]
|
||||
#[ignore(reason = "valgrind says it's leaky")]
|
||||
fn test_ip_get_addr_bad_input() {
|
||||
let localhost_name = ~"sjkl234m,./sdf";
|
||||
let iotask = &uv::global_loop::get();
|
||||
let ga_result = get_addr(localhost_name, iotask);
|
||||
assert!(result::is_err(&ga_result));
|
||||
}
|
||||
}
|
@ -1,25 +0,0 @@
|
||||
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
|
||||
// file at the top-level directory of this distribution and at
|
||||
// http://rust-lang.org/COPYRIGHT.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
||||
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
||||
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
/*!
|
||||
Top-level module for network-related functionality.
|
||||
|
||||
Basically, including this module gives you:
|
||||
|
||||
* `tcp`
|
||||
* `ip`
|
||||
* `url`
|
||||
|
||||
See each of those three modules for documentation on what they do.
|
||||
*/
|
||||
|
||||
pub mod tcp;
|
||||
pub mod ip;
|
||||
pub mod url;
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -1,296 +0,0 @@
|
||||
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
|
||||
// file at the top-level directory of this distribution and at
|
||||
// http://rust-lang.org/COPYRIGHT.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
||||
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
||||
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
//! Utilities that leverage libuv's `uv_timer_*` API
|
||||
|
||||
|
||||
use uv;
|
||||
use uv::iotask;
|
||||
use uv::iotask::IoTask;
|
||||
|
||||
use std::cast::transmute;
|
||||
use std::cast;
|
||||
use std::comm::{stream, Chan, SharedChan, Port, select2i};
|
||||
use std::either;
|
||||
use std::libc::c_void;
|
||||
use std::libc;
|
||||
|
||||
/**
|
||||
* Wait for timeout period then send provided value over a channel
|
||||
*
|
||||
* This call returns immediately. Useful as the building block for a number
|
||||
* of higher-level timer functions.
|
||||
*
|
||||
* Is not guaranteed to wait for exactly the specified time, but will wait
|
||||
* for *at least* that period of time.
|
||||
*
|
||||
* # Arguments
|
||||
*
|
||||
* * `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on
|
||||
* * msecs - a timeout period, in milliseconds, to wait
|
||||
* * ch - a channel of type T to send a `val` on
|
||||
* * val - a value of type T to send over the provided `ch`
|
||||
*/
|
||||
pub fn delayed_send<T:Send>(iotask: &IoTask,
|
||||
msecs: uint,
|
||||
ch: &Chan<T>,
|
||||
val: T) {
|
||||
let (timer_done_po, timer_done_ch) = stream::<()>();
|
||||
let timer_done_ch = SharedChan::new(timer_done_ch);
|
||||
let timer = uv::ll::timer_t();
|
||||
let timer_ptr: *uv::ll::uv_timer_t = &timer;
|
||||
do iotask::interact(iotask) |loop_ptr| {
|
||||
unsafe {
|
||||
let init_result = uv::ll::timer_init(loop_ptr, timer_ptr);
|
||||
if (init_result == 0i32) {
|
||||
let start_result = uv::ll::timer_start(
|
||||
timer_ptr, delayed_send_cb, msecs, 0u);
|
||||
if (start_result == 0i32) {
|
||||
// Note: putting the channel into a ~
|
||||
// to cast to *c_void
|
||||
let timer_done_ch_clone = ~timer_done_ch.clone();
|
||||
let timer_done_ch_ptr = transmute::<
|
||||
~SharedChan<()>, *c_void>(
|
||||
timer_done_ch_clone);
|
||||
uv::ll::set_data_for_uv_handle(
|
||||
timer_ptr,
|
||||
timer_done_ch_ptr);
|
||||
} else {
|
||||
let error_msg = uv::ll::get_last_err_info(
|
||||
loop_ptr);
|
||||
fail!("timer::delayed_send() start failed: %s", error_msg);
|
||||
}
|
||||
} else {
|
||||
let error_msg = uv::ll::get_last_err_info(loop_ptr);
|
||||
fail!("timer::delayed_send() init failed: %s", error_msg);
|
||||
}
|
||||
}
|
||||
};
|
||||
// delayed_send_cb has been processed by libuv
|
||||
timer_done_po.recv();
|
||||
// notify the caller immediately
|
||||
ch.send(val);
|
||||
// uv_close for this timer has been processed
|
||||
timer_done_po.recv();
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks the current task for (at least) the specified time period.
|
||||
*
|
||||
* Is not guaranteed to sleep for exactly the specified time, but will sleep
|
||||
* for *at least* that period of time.
|
||||
*
|
||||
* # Arguments
|
||||
*
|
||||
* * `iotask` - a `uv::iotask` that the tcp request will run on
|
||||
* * msecs - an amount of time, in milliseconds, for the current task to block
|
||||
*/
|
||||
pub fn sleep(iotask: &IoTask, msecs: uint) {
|
||||
let (exit_po, exit_ch) = stream::<()>();
|
||||
delayed_send(iotask, msecs, &exit_ch, ());
|
||||
exit_po.recv();
|
||||
}
|
||||
|
||||
/**
|
||||
* Receive on a port for (up to) a specified time, then return an `Option<T>`
|
||||
*
|
||||
* This call will block to receive on the provided port for up to the
|
||||
* specified timeout. Depending on whether the provided port receives in that
|
||||
* time period, `recv_timeout` will return an `Option<T>` representing the
|
||||
* result.
|
||||
*
|
||||
* # Arguments
|
||||
*
|
||||
* * `iotask' - `uv::iotask` that the tcp request will run on
|
||||
* * msecs - an mount of time, in milliseconds, to wait to receive
|
||||
* * wait_port - a `std::comm::port<T>` to receive on
|
||||
*
|
||||
* # Returns
|
||||
*
|
||||
* An `Option<T>` representing the outcome of the call. If the call `recv`'d
|
||||
* on the provided port in the allotted timeout period, then the result will
|
||||
* be a `Some(T)`. If not, then `None` will be returned.
|
||||
*/
|
||||
pub fn recv_timeout<T:Send>(iotask: &IoTask, msecs: uint, wait_po: &Port<T>)
|
||||
-> Option<T> {
|
||||
let (timeout_po, timeout_ch) = stream::<()>();
|
||||
let mut timeout_po = timeout_po;
|
||||
delayed_send(iotask, msecs, &timeout_ch, ());
|
||||
|
||||
// XXX: Workaround due to ports and channels not being &mut. They should
|
||||
// be.
|
||||
unsafe {
|
||||
let wait_po = cast::transmute_mut(wait_po);
|
||||
|
||||
either::either(
|
||||
|_| {
|
||||
None
|
||||
}, |_| {
|
||||
Some(wait_po.recv())
|
||||
}, &select2i(&mut timeout_po, wait_po)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// INTERNAL API
|
||||
extern fn delayed_send_cb(handle: *uv::ll::uv_timer_t, status: libc::c_int) {
|
||||
unsafe {
|
||||
debug!(
|
||||
"delayed_send_cb handle %? status %?", handle, status);
|
||||
// Faking a borrowed pointer to our ~SharedChan
|
||||
let timer_done_ch_ptr: &*c_void = &uv::ll::get_data_for_uv_handle(
|
||||
handle);
|
||||
let timer_done_ch_ptr = transmute::<&*c_void, &~SharedChan<()>>(
|
||||
timer_done_ch_ptr);
|
||||
let stop_result = uv::ll::timer_stop(handle);
|
||||
if (stop_result == 0i32) {
|
||||
timer_done_ch_ptr.send(());
|
||||
uv::ll::close(handle, delayed_send_close_cb);
|
||||
} else {
|
||||
let loop_ptr = uv::ll::get_loop_for_uv_handle(handle);
|
||||
let error_msg = uv::ll::get_last_err_info(loop_ptr);
|
||||
fail!("timer::sleep() init failed: %s", error_msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
extern fn delayed_send_close_cb(handle: *uv::ll::uv_timer_t) {
|
||||
unsafe {
|
||||
debug!("delayed_send_close_cb handle %?", handle);
|
||||
let timer_done_ch_ptr = uv::ll::get_data_for_uv_handle(handle);
|
||||
let timer_done_ch = transmute::<*c_void, ~SharedChan<()>>(
|
||||
timer_done_ch_ptr);
|
||||
timer_done_ch.send(());
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
use timer::*;
|
||||
use uv;
|
||||
|
||||
use std::cell::Cell;
|
||||
use std::pipes::{stream, SharedChan};
|
||||
use std::rand::RngUtil;
|
||||
use std::rand;
|
||||
use std::task;
|
||||
|
||||
#[test]
|
||||
fn test_gl_timer_simple_sleep_test() {
|
||||
let hl_loop = &uv::global_loop::get();
|
||||
sleep(hl_loop, 1u);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_gl_timer_sleep_stress1() {
|
||||
let hl_loop = &uv::global_loop::get();
|
||||
for 50u.times {
|
||||
sleep(hl_loop, 1u);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_gl_timer_sleep_stress2() {
|
||||
let (po, ch) = stream();
|
||||
let ch = SharedChan::new(ch);
|
||||
let hl_loop = &uv::global_loop::get();
|
||||
|
||||
let repeat = 20u;
|
||||
let spec = {
|
||||
|
||||
~[(1u, 20u),
|
||||
(10u, 10u),
|
||||
(20u, 2u)]
|
||||
|
||||
};
|
||||
|
||||
for repeat.times {
|
||||
let ch = ch.clone();
|
||||
for spec.iter().advance |spec| {
|
||||
let (times, maxms) = *spec;
|
||||
let ch = ch.clone();
|
||||
let hl_loop_clone = hl_loop.clone();
|
||||
do task::spawn {
|
||||
use std::rand::*;
|
||||
let mut rng = rng();
|
||||
for times.times {
|
||||
sleep(&hl_loop_clone, rng.next() as uint % maxms);
|
||||
}
|
||||
ch.send(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (repeat * spec.len()).times {
|
||||
po.recv()
|
||||
}
|
||||
}
|
||||
|
||||
// Because valgrind serializes multithreaded programs it can
|
||||
// make timing-sensitive tests fail in wierd ways. In these
|
||||
// next test we run them many times and expect them to pass
|
||||
// the majority of tries.
|
||||
|
||||
#[test]
|
||||
#[cfg(ignore)]
|
||||
fn test_gl_timer_recv_timeout_before_time_passes() {
|
||||
let times = 100;
|
||||
let mut successes = 0;
|
||||
let mut failures = 0;
|
||||
let hl_loop = uv::global_loop::get();
|
||||
|
||||
for (times as uint).times {
|
||||
task::yield();
|
||||
|
||||
let expected = rand::rng().gen_str(16u);
|
||||
let (test_po, test_ch) = stream::<~str>();
|
||||
|
||||
do task::spawn() {
|
||||
delayed_send(hl_loop, 1u, &test_ch, expected);
|
||||
};
|
||||
|
||||
match recv_timeout(hl_loop, 10u, &test_po) {
|
||||
Some(val) => {
|
||||
assert_eq!(val, expected);
|
||||
successes += 1;
|
||||
}
|
||||
_ => failures += 1
|
||||
};
|
||||
}
|
||||
|
||||
assert!(successes > times / 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_gl_timer_recv_timeout_after_time_passes() {
|
||||
let times = 100;
|
||||
let mut successes = 0;
|
||||
let mut failures = 0;
|
||||
let hl_loop = uv::global_loop::get();
|
||||
|
||||
for (times as uint).times {
|
||||
let mut rng = rand::rng();
|
||||
let expected = Cell::new(rng.gen_str(16u));
|
||||
let (test_po, test_ch) = stream::<~str>();
|
||||
let hl_loop_clone = hl_loop.clone();
|
||||
do task::spawn() {
|
||||
delayed_send(&hl_loop_clone, 50u, &test_ch, expected.take());
|
||||
};
|
||||
|
||||
match recv_timeout(&hl_loop, 1u, &test_po) {
|
||||
None => successes += 1,
|
||||
_ => failures += 1
|
||||
};
|
||||
}
|
||||
|
||||
assert!(successes > times / 2);
|
||||
}
|
||||
}
|
@ -1,38 +0,0 @@
|
||||
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
|
||||
// file at the top-level directory of this distribution and at
|
||||
// http://rust-lang.org/COPYRIGHT.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
||||
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
||||
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
/*!
|
||||
* Rust bindings to libuv
|
||||
*
|
||||
* This is the base-module for various levels of bindings to
|
||||
* the libuv library.
|
||||
*
|
||||
* These modules are seeing heavy work, currently, and the final
|
||||
* API layout should not be inferred from its current form.
|
||||
*
|
||||
* This base module currently contains a historical, rust-based
|
||||
* implementation of a few libuv operations that hews closely to
|
||||
* the patterns of the libuv C-API. It was used, mostly, to explore
|
||||
* some implementation details and will most likely be deprecated
|
||||
* in the near future.
|
||||
*
|
||||
* The `ll` module contains low-level mappings for working directly
|
||||
* with the libuv C-API.
|
||||
*
|
||||
* The `hl` module contains a set of tools library developers can
|
||||
* use for interacting with an active libuv loop. This modules's
|
||||
* API is meant to be used to write high-level,
|
||||
* rust-idiomatic abstractions for utilizes libuv's asynchronous IO
|
||||
* facilities.
|
||||
*/
|
||||
|
||||
pub use ll = super::uv_ll;
|
||||
pub use iotask = uv_iotask;
|
||||
pub use global_loop = uv_global_loop;
|
@ -1,227 +0,0 @@
|
||||
// Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
|
||||
// file at the top-level directory of this distribution and at
|
||||
// http://rust-lang.org/COPYRIGHT.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
||||
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
||||
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
//! A process-wide libuv event loop for library use.
|
||||
|
||||
|
||||
use iotask = uv_iotask;
|
||||
use uv_iotask::{IoTask, spawn_iotask};
|
||||
|
||||
use std::comm::Chan;
|
||||
use std::option::{Some, None};
|
||||
use std::task::task;
|
||||
use std::unstable::global::{global_data_clone_create, global_data_clone};
|
||||
use std::unstable::weak_task::weaken_task;
|
||||
|
||||
/**
|
||||
* Race-free helper to get access to a global task where a libuv
|
||||
* loop is running.
|
||||
*
|
||||
* Use `uv::hl::interact` to do operations against the global
|
||||
* loop that this function returns.
|
||||
*
|
||||
* # Return
|
||||
*
|
||||
* * A `hl::high_level_loop` that encapsulates communication with the global
|
||||
* loop.
|
||||
*/
|
||||
pub fn get() -> IoTask {
|
||||
return get_monitor_task_gl();
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
fn get_monitor_task_gl() -> IoTask {
|
||||
|
||||
type MonChan = Chan<IoTask>;
|
||||
|
||||
struct GlobalIoTask(IoTask);
|
||||
|
||||
impl Clone for GlobalIoTask {
|
||||
fn clone(&self) -> GlobalIoTask {
|
||||
GlobalIoTask((**self).clone())
|
||||
}
|
||||
}
|
||||
|
||||
fn key(_: GlobalIoTask) { }
|
||||
|
||||
match unsafe { global_data_clone(key) } {
|
||||
Some(GlobalIoTask(iotask)) => iotask,
|
||||
None => {
|
||||
let iotask: IoTask = spawn_loop();
|
||||
let mut installed = false;
|
||||
let final_iotask = unsafe {
|
||||
do global_data_clone_create(key) {
|
||||
installed = true;
|
||||
~GlobalIoTask(iotask.clone())
|
||||
}
|
||||
};
|
||||
if installed {
|
||||
let mut task = task();
|
||||
task.unlinked();
|
||||
do task.spawn {
|
||||
unsafe {
|
||||
debug!("global monitor task starting");
|
||||
// As a weak task the runtime will notify us
|
||||
// when to exit
|
||||
do weaken_task |weak_exit_po| {
|
||||
debug!("global monitor task is weak");
|
||||
weak_exit_po.recv();
|
||||
iotask::exit(&iotask);
|
||||
debug!("global monitor task is unweak");
|
||||
};
|
||||
debug!("global monitor task exiting");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
iotask::exit(&iotask);
|
||||
}
|
||||
|
||||
match final_iotask {
|
||||
GlobalIoTask(iotask) => iotask
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_loop() -> IoTask {
|
||||
let mut builder = task();
|
||||
|
||||
do builder.add_wrapper |task_body| {
|
||||
let result: ~fn() = || {
|
||||
// The I/O loop task also needs to be weak so it doesn't keep
|
||||
// the runtime alive
|
||||
unsafe {
|
||||
do weaken_task |_| {
|
||||
debug!("global libuv task is now weak");
|
||||
task_body();
|
||||
|
||||
// We don't wait for the exit message on weak_exit_po
|
||||
// because the monitor task will tell the uv loop when to
|
||||
// exit
|
||||
|
||||
debug!("global libuv task is leaving weakened state");
|
||||
}
|
||||
}
|
||||
};
|
||||
result
|
||||
};
|
||||
|
||||
builder.unlinked();
|
||||
spawn_iotask(builder)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
use get_gl = uv_global_loop::get;
|
||||
use uv::iotask;
|
||||
use uv::ll;
|
||||
use uv_iotask::IoTask;
|
||||
|
||||
use std::libc;
|
||||
use std::task;
|
||||
use std::cast::transmute;
|
||||
use std::libc::c_void;
|
||||
use std::comm::{stream, SharedChan, Chan};
|
||||
|
||||
extern fn simple_timer_close_cb(timer_ptr: *ll::uv_timer_t) {
|
||||
unsafe {
|
||||
let exit_ch_ptr = ll::get_data_for_uv_handle(
|
||||
timer_ptr as *libc::c_void);
|
||||
let exit_ch = transmute::<*c_void, ~Chan<bool>>(exit_ch_ptr);
|
||||
exit_ch.send(true);
|
||||
debug!("EXIT_CH_PTR simple_timer_close_cb exit_ch_ptr: %?",
|
||||
exit_ch_ptr);
|
||||
}
|
||||
}
|
||||
extern fn simple_timer_cb(timer_ptr: *ll::uv_timer_t,
|
||||
_status: libc::c_int) {
|
||||
unsafe {
|
||||
debug!(~"in simple timer cb");
|
||||
ll::timer_stop(timer_ptr);
|
||||
let hl_loop = &get_gl();
|
||||
do iotask::interact(hl_loop) |_loop_ptr| {
|
||||
debug!(~"closing timer");
|
||||
ll::close(timer_ptr, simple_timer_close_cb);
|
||||
debug!(~"about to deref exit_ch_ptr");
|
||||
debug!(~"after msg sent on deref'd exit_ch");
|
||||
};
|
||||
debug!(~"exiting simple timer cb");
|
||||
}
|
||||
}
|
||||
|
||||
fn impl_uv_hl_simple_timer(iotask: &IoTask) {
|
||||
unsafe {
|
||||
let (exit_po, exit_ch) = stream::<bool>();
|
||||
let exit_ch_ptr: *libc::c_void = transmute(~exit_ch);
|
||||
debug!("EXIT_CH_PTR newly created exit_ch_ptr: %?",
|
||||
exit_ch_ptr);
|
||||
let timer_handle = ll::timer_t();
|
||||
let timer_ptr: *ll::uv_timer_t = &timer_handle;
|
||||
do iotask::interact(iotask) |loop_ptr| {
|
||||
debug!(~"user code inside interact loop!!!");
|
||||
let init_status = ll::timer_init(loop_ptr, timer_ptr);
|
||||
if(init_status == 0i32) {
|
||||
ll::set_data_for_uv_handle(
|
||||
timer_ptr as *libc::c_void,
|
||||
exit_ch_ptr);
|
||||
let start_status = ll::timer_start(timer_ptr,
|
||||
simple_timer_cb,
|
||||
1u, 0u);
|
||||
if(start_status != 0i32) {
|
||||
fail!("failure on ll::timer_start()");
|
||||
}
|
||||
}
|
||||
else {
|
||||
fail!("failure on ll::timer_init()");
|
||||
}
|
||||
};
|
||||
exit_po.recv();
|
||||
debug!(
|
||||
~"global_loop timer test: msg recv on exit_po, done..");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_gl_uv_global_loop_high_level_global_timer() {
|
||||
let hl_loop = &get_gl();
|
||||
let (exit_po, exit_ch) = stream::<()>();
|
||||
task::spawn_sched(task::ManualThreads(1u), || {
|
||||
let hl_loop = &get_gl();
|
||||
impl_uv_hl_simple_timer(hl_loop);
|
||||
exit_ch.send(());
|
||||
});
|
||||
impl_uv_hl_simple_timer(hl_loop);
|
||||
exit_po.recv();
|
||||
}
|
||||
|
||||
// keeping this test ignored until some kind of stress-test-harness
|
||||
// is set up for the build bots
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_stress_gl_uv_global_loop_high_level_global_timer() {
|
||||
let (exit_po, exit_ch) = stream::<()>();
|
||||
let exit_ch = SharedChan::new(exit_ch);
|
||||
let cycles = 5000u;
|
||||
for cycles.times {
|
||||
let exit_ch_clone = exit_ch.clone();
|
||||
task::spawn_sched(task::ManualThreads(1u), || {
|
||||
let hl_loop = &get_gl();
|
||||
impl_uv_hl_simple_timer(hl_loop);
|
||||
exit_ch_clone.send(());
|
||||
});
|
||||
};
|
||||
for cycles.times {
|
||||
exit_po.recv();
|
||||
};
|
||||
debug!("test_stress_gl_uv_global_loop_high_level_global_timer \
|
||||
exiting successfully!");
|
||||
}
|
||||
}
|
@ -1,311 +0,0 @@
|
||||
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
|
||||
// file at the top-level directory of this distribution and at
|
||||
// http://rust-lang.org/COPYRIGHT.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
||||
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
||||
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
/*!
|
||||
* A task-based interface to the uv loop
|
||||
*
|
||||
* The I/O task runs in its own single-threaded scheduler. By using the
|
||||
* `interact` function you can execute code in a uv callback.
|
||||
*/
|
||||
|
||||
#[allow(missing_doc)];
|
||||
|
||||
|
||||
use ll = uv_ll;
|
||||
|
||||
use std::comm::{stream, Port, Chan, SharedChan};
|
||||
use std::libc::c_void;
|
||||
use std::libc;
|
||||
use std::task;
|
||||
|
||||
/// Used to abstract-away direct interaction with a libuv loop.
|
||||
pub struct IoTask {
|
||||
async_handle: *ll::uv_async_t,
|
||||
op_chan: SharedChan<IoTaskMsg>
|
||||
}
|
||||
|
||||
impl Clone for IoTask {
|
||||
fn clone(&self) -> IoTask {
|
||||
IoTask{
|
||||
async_handle: self.async_handle,
|
||||
op_chan: self.op_chan.clone()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_iotask(mut task: task::TaskBuilder) -> IoTask {
|
||||
let (iotask_port, iotask_chan) = stream();
|
||||
|
||||
task.sched_mode(task::SingleThreaded);
|
||||
do task.spawn {
|
||||
debug!("entering libuv task");
|
||||
run_loop(&iotask_chan);
|
||||
debug!("libuv task exiting");
|
||||
};
|
||||
|
||||
iotask_port.recv()
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Provide a callback to be processed by `iotask`
|
||||
*
|
||||
* The primary way to do operations again a running `iotask` that
|
||||
* doesn't involve creating a uv handle via `safe_handle`
|
||||
*
|
||||
* # Warning
|
||||
*
|
||||
* This function is the only safe way to interact with _any_ `iotask`.
|
||||
* Using functions in the `uv::ll` module outside of the `cb` passed into
|
||||
* this function is _very dangerous_.
|
||||
*
|
||||
* # Arguments
|
||||
*
|
||||
* * iotask - a uv I/O task that you want to do operations against
|
||||
* * cb - a function callback to be processed on the running loop's
|
||||
* thread. The only parameter passed in is an opaque pointer representing the
|
||||
* running `uv_loop_t*`. In the context of this callback, it is safe to use
|
||||
* this pointer to do various uv_* API calls contained within the `uv::ll`
|
||||
* module. It is not safe to send the `loop_ptr` param to this callback out
|
||||
* via ports/chans.
|
||||
*/
|
||||
pub fn interact(iotask: &IoTask, cb: ~fn(*c_void)) {
|
||||
send_msg(iotask, Interaction(cb));
|
||||
}
|
||||
|
||||
/**
|
||||
* Shut down the I/O task
|
||||
*
|
||||
* Is used to signal to the loop that it should close the internally-held
|
||||
* async handle and do a sanity check to make sure that all other handles are
|
||||
* closed, causing a failure otherwise.
|
||||
*/
|
||||
pub fn exit(iotask: &IoTask) {
|
||||
send_msg(iotask, TeardownLoop);
|
||||
}
|
||||
|
||||
|
||||
// INTERNAL API
|
||||
|
||||
enum IoTaskMsg {
|
||||
Interaction(~fn(*libc::c_void)),
|
||||
TeardownLoop
|
||||
}
|
||||
|
||||
/// Run the loop and begin handling messages
|
||||
fn run_loop(iotask_ch: &Chan<IoTask>) {
|
||||
|
||||
unsafe {
|
||||
debug!("creating loop");
|
||||
let loop_ptr = ll::loop_new();
|
||||
|
||||
// set up the special async handle we'll use to allow multi-task
|
||||
// communication with this loop
|
||||
let async = ll::async_t();
|
||||
let async_handle: *ll::uv_async_t = &async;
|
||||
|
||||
// associate the async handle with the loop
|
||||
ll::async_init(loop_ptr, async_handle, wake_up_cb);
|
||||
|
||||
let (msg_po, msg_ch) = stream::<IoTaskMsg>();
|
||||
|
||||
// initialize our loop data and store it in the loop
|
||||
let data: IoTaskLoopData = IoTaskLoopData {
|
||||
async_handle: async_handle,
|
||||
msg_po: msg_po
|
||||
};
|
||||
ll::set_data_for_uv_handle(async_handle, &data);
|
||||
|
||||
// Send out a handle through which folks can talk to us
|
||||
// while we dwell in the I/O loop
|
||||
let iotask = IoTask {
|
||||
async_handle: async_handle,
|
||||
op_chan: SharedChan::new(msg_ch)
|
||||
};
|
||||
iotask_ch.send(iotask);
|
||||
|
||||
debug!("about to run uv loop");
|
||||
// enter the loop... this blocks until the loop is done..
|
||||
ll::run(loop_ptr);
|
||||
debug!("uv loop ended");
|
||||
ll::loop_delete(loop_ptr);
|
||||
}
|
||||
}
|
||||
|
||||
// data that lives for the lifetime of the high-evel oo
|
||||
struct IoTaskLoopData {
|
||||
async_handle: *ll::uv_async_t,
|
||||
msg_po: Port<IoTaskMsg>,
|
||||
}
|
||||
|
||||
fn send_msg(iotask: &IoTask,
|
||||
msg: IoTaskMsg) {
|
||||
iotask.op_chan.send(msg);
|
||||
unsafe {
|
||||
ll::async_send(iotask.async_handle);
|
||||
}
|
||||
}
|
||||
|
||||
/// Dispatch all pending messages
|
||||
extern fn wake_up_cb(async_handle: *ll::uv_async_t,
|
||||
status: int) {
|
||||
|
||||
debug!("wake_up_cb extern.. handle: %? status: %?",
|
||||
async_handle, status);
|
||||
|
||||
unsafe {
|
||||
let loop_ptr = ll::get_loop_for_uv_handle(async_handle);
|
||||
let data =
|
||||
ll::get_data_for_uv_handle(async_handle) as *IoTaskLoopData;
|
||||
let msg_po = &(*data).msg_po;
|
||||
|
||||
while msg_po.peek() {
|
||||
match msg_po.recv() {
|
||||
Interaction(ref cb) => (*cb)(loop_ptr),
|
||||
TeardownLoop => begin_teardown(data)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn begin_teardown(data: *IoTaskLoopData) {
|
||||
unsafe {
|
||||
debug!("iotask begin_teardown() called, close async_handle");
|
||||
let async_handle = (*data).async_handle;
|
||||
ll::close(async_handle as *c_void, tear_down_close_cb);
|
||||
}
|
||||
}
|
||||
extern fn tear_down_walk_cb(handle: *libc::c_void, arg: *libc::c_void) {
|
||||
debug!("IN TEARDOWN WALK CB");
|
||||
// pretty much, if we still have an active handle and it is *not*
|
||||
// the async handle that facilities global loop communication, we
|
||||
// want to barf out and fail
|
||||
assert_eq!(handle, arg);
|
||||
}
|
||||
|
||||
extern fn tear_down_close_cb(handle: *ll::uv_async_t) {
|
||||
unsafe {
|
||||
let loop_ptr = ll::get_loop_for_uv_handle(handle);
|
||||
debug!("in tear_down_close_cb");
|
||||
ll::walk(loop_ptr, tear_down_walk_cb, handle as *libc::c_void);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
extern fn async_close_cb(handle: *ll::uv_async_t) {
|
||||
unsafe {
|
||||
debug!("async_close_cb handle %?", handle);
|
||||
let exit_ch = &(*(ll::get_data_for_uv_handle(handle)
|
||||
as *AhData)).exit_ch;
|
||||
let exit_ch = exit_ch.clone();
|
||||
exit_ch.send(());
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
extern fn async_handle_cb(handle: *ll::uv_async_t, status: libc::c_int) {
|
||||
unsafe {
|
||||
debug!("async_handle_cb handle %? status %?",handle,status);
|
||||
ll::close(handle, async_close_cb);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
struct AhData {
|
||||
iotask: IoTask,
|
||||
exit_ch: SharedChan<()>
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn impl_uv_iotask_async(iotask: &IoTask) {
|
||||
use std::ptr;
|
||||
|
||||
let async_handle = ll::async_t();
|
||||
let ah_ptr: *ll::uv_async_t = &async_handle;
|
||||
let (exit_po, exit_ch) = stream::<()>();
|
||||
let ah_data = AhData {
|
||||
iotask: iotask.clone(),
|
||||
exit_ch: SharedChan::new(exit_ch)
|
||||
};
|
||||
let ah_data_ptr: *AhData = ptr::to_unsafe_ptr(&ah_data);
|
||||
debug!("about to interact");
|
||||
do interact(iotask) |loop_ptr| {
|
||||
unsafe {
|
||||
debug!("interacting");
|
||||
ll::async_init(loop_ptr, ah_ptr, async_handle_cb);
|
||||
ll::set_data_for_uv_handle(
|
||||
ah_ptr, ah_data_ptr as *libc::c_void);
|
||||
ll::async_send(ah_ptr);
|
||||
}
|
||||
};
|
||||
debug!("waiting for async close");
|
||||
exit_po.recv();
|
||||
}
|
||||
|
||||
// this fn documents the bear minimum necessary to roll your own
|
||||
// high_level_loop
|
||||
#[cfg(test)]
|
||||
fn spawn_test_loop(exit_ch: ~Chan<()>) -> IoTask {
|
||||
let (iotask_port, iotask_ch) = stream::<IoTask>();
|
||||
do task::spawn_sched(task::ManualThreads(1u)) {
|
||||
debug!("about to run a test loop");
|
||||
run_loop(&iotask_ch);
|
||||
exit_ch.send(());
|
||||
};
|
||||
return iotask_port.recv();
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
extern fn lifetime_handle_close(handle: *libc::c_void) {
|
||||
debug!("lifetime_handle_close ptr %?", handle);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
extern fn lifetime_async_callback(handle: *libc::c_void,
|
||||
status: libc::c_int) {
|
||||
debug!("lifetime_handle_close ptr %? status %?",
|
||||
handle, status);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_uv_iotask_async() {
|
||||
let (exit_po, exit_ch) = stream::<()>();
|
||||
let iotask = &spawn_test_loop(~exit_ch);
|
||||
|
||||
debug!("spawned iotask");
|
||||
|
||||
// using this handle to manage the lifetime of the
|
||||
// high_level_loop, as it will exit the first time one of
|
||||
// the impl_uv_hl_async() is cleaned up with no one ref'd
|
||||
// handles on the loop (Which can happen under
|
||||
// race-condition type situations.. this ensures that the
|
||||
// loop lives until, at least, all of the
|
||||
// impl_uv_hl_async() runs have been called, at least.
|
||||
let (work_exit_po, work_exit_ch) = stream::<()>();
|
||||
let work_exit_ch = SharedChan::new(work_exit_ch);
|
||||
for 7u.times {
|
||||
let iotask_clone = iotask.clone();
|
||||
let work_exit_ch_clone = work_exit_ch.clone();
|
||||
do task::spawn_sched(task::ManualThreads(1u)) {
|
||||
debug!("async");
|
||||
impl_uv_iotask_async(&iotask_clone);
|
||||
debug!("done async");
|
||||
work_exit_ch_clone.send(());
|
||||
};
|
||||
};
|
||||
for 7u.times {
|
||||
debug!("waiting");
|
||||
work_exit_po.recv();
|
||||
};
|
||||
debug!(~"sending teardown_loop msg..");
|
||||
exit(iotask);
|
||||
exit_po.recv();
|
||||
debug!(~"after recv on exit_po.. exiting..");
|
||||
}
|
File diff suppressed because it is too large
Load Diff
@ -9,6 +9,7 @@
|
||||
// except according to those terms.
|
||||
|
||||
// xfail-fast
|
||||
// xfail-test needs networking
|
||||
|
||||
extern mod extra;
|
||||
|
||||
|
@ -13,6 +13,7 @@
|
||||
// Make sure that we can detect when one end of the pipe is closed.
|
||||
|
||||
// xfail-win32
|
||||
// xfail-test needs sleep
|
||||
|
||||
extern mod extra;
|
||||
use extra::timer::sleep;
|
||||
|
@ -10,9 +10,6 @@
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
extern mod extra;
|
||||
use extra::timer::sleep;
|
||||
use extra::uv;
|
||||
use std::pipes;
|
||||
|
||||
proto! oneshot (
|
||||
|
@ -12,6 +12,7 @@
|
||||
|
||||
// xfail-pretty
|
||||
// xfail-win32
|
||||
// xfail-test needs sleep
|
||||
|
||||
extern mod extra;
|
||||
use extra::timer::sleep;
|
||||
|
@ -10,6 +10,8 @@
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
// xfail-test needs sleep
|
||||
|
||||
extern mod extra;
|
||||
|
||||
use extra::timer::sleep;
|
||||
|
Loading…
x
Reference in New Issue
Block a user