34ee63e93b
Part of #3853
1981 lines
72 KiB
Rust
1981 lines
72 KiB
Rust
// Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
|
|
// file at the top-level directory of this distribution and at
|
|
// http://rust-lang.org/COPYRIGHT.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
|
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
|
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
|
// option. This file may not be copied, modified, or distributed
|
|
// except according to those terms.
|
|
|
|
//! High-level interface to libuv's TCP functionality
|
|
// FIXME #4425: Need FFI fixes
|
|
|
|
#[allow(missing_doc)];
|
|
|
|
use core::prelude::*;
|
|
|
|
use future;
|
|
use future_spawn = future::spawn;
|
|
use ip = net_ip;
|
|
use uv;
|
|
use uv::iotask;
|
|
use uv::iotask::IoTask;
|
|
|
|
use core::io;
|
|
use core::libc::size_t;
|
|
use core::libc;
|
|
use core::comm::{stream, Port, SharedChan};
|
|
use core::ptr;
|
|
use core::result::{Result};
|
|
use core::result;
|
|
use core::uint;
|
|
use core::vec;
|
|
|
|
pub mod rustrt {
|
|
use core::libc;
|
|
|
|
#[nolink]
|
|
pub extern {
|
|
unsafe fn rust_uv_current_kernel_malloc(size: libc::c_uint)
|
|
-> *libc::c_void;
|
|
unsafe fn rust_uv_current_kernel_free(mem: *libc::c_void);
|
|
unsafe fn rust_uv_helper_uv_tcp_t_size() -> libc::c_uint;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Encapsulates an open TCP/IP connection through libuv
|
|
*
|
|
* `TcpSocket` is non-copyable/sendable and automagically handles closing the
|
|
* underlying libuv data structures when it goes out of scope. This is the
|
|
* data structure that is used for read/write operations over a TCP stream.
|
|
*/
|
|
pub struct TcpSocket {
|
|
socket_data: @TcpSocketData,
|
|
}
|
|
|
|
#[unsafe_destructor]
|
|
impl Drop for TcpSocket {
|
|
fn finalize(&self) {
|
|
tear_down_socket_data(self.socket_data)
|
|
}
|
|
}
|
|
|
|
pub fn TcpSocket(socket_data: @TcpSocketData) -> TcpSocket {
|
|
TcpSocket {
|
|
socket_data: socket_data
|
|
}
|
|
}
|
|
|
|
/**
|
|
* A buffered wrapper for `net::tcp::TcpSocket`
|
|
*
|
|
* It is created with a call to `net::tcp::socket_buf()` and has impls that
|
|
* satisfy both the `io::Reader` and `io::Writer` traits.
|
|
*/
|
|
pub struct TcpSocketBuf {
|
|
data: @mut TcpBufferedSocketData,
|
|
end_of_stream: @mut bool
|
|
}
|
|
|
|
pub fn TcpSocketBuf(data: @mut TcpBufferedSocketData) -> TcpSocketBuf {
|
|
TcpSocketBuf {
|
|
data: data,
|
|
end_of_stream: @mut false
|
|
}
|
|
}
|
|
|
|
/// Contains raw, string-based, error information returned from libuv
|
|
pub struct TcpErrData {
|
|
err_name: ~str,
|
|
err_msg: ~str,
|
|
}
|
|
|
|
/// Details returned as part of a `Result::Err` result from `tcp::listen`
|
|
pub enum TcpListenErrData {
|
|
/**
|
|
* Some unplanned-for error. The first and second fields correspond
|
|
* to libuv's `err_name` and `err_msg` fields, respectively.
|
|
*/
|
|
GenericListenErr(~str, ~str),
|
|
/**
|
|
* Failed to bind to the requested IP/Port, because it is already in use.
|
|
*
|
|
* # Possible Causes
|
|
*
|
|
* * Attempting to bind to a port already bound to another listener
|
|
*/
|
|
AddressInUse,
|
|
/**
|
|
* Request to bind to an IP/Port was denied by the system.
|
|
*
|
|
* # Possible Causes
|
|
*
|
|
* * Attemping to binding to an IP/Port as a non-Administrator
|
|
* on Windows Vista+
|
|
* * Attempting to bind, as a non-priv'd
|
|
* user, to 'privileged' ports (< 1024) on *nix
|
|
*/
|
|
AccessDenied
|
|
}
|
|
/// Details returned as part of a `Result::Err` result from `tcp::connect`
|
|
pub enum TcpConnectErrData {
|
|
/**
|
|
* Some unplanned-for error. The first and second fields correspond
|
|
* to libuv's `err_name` and `err_msg` fields, respectively.
|
|
*/
|
|
GenericConnectErr(~str, ~str),
|
|
/// Invalid IP or invalid port
|
|
ConnectionRefused
|
|
}
|
|
|
|
/**
|
|
* Initiate a client connection over TCP/IP
|
|
*
|
|
* # Arguments
|
|
*
|
|
* * `input_ip` - The IP address (versions 4 or 6) of the remote host
|
|
* * `port` - the unsigned integer of the desired remote host port
|
|
* * `iotask` - a `uv::iotask` that the tcp request will run on
|
|
*
|
|
* # Returns
|
|
*
|
|
* A `result` that, if the operation succeeds, contains a
|
|
* `net::net::TcpSocket` that can be used to send and receive data to/from
|
|
* the remote host. In the event of failure, a
|
|
* `net::tcp::TcpConnectErrData` instance will be returned
|
|
*/
|
|
pub fn connect(input_ip: ip::IpAddr, port: uint,
|
|
iotask: &IoTask)
|
|
-> result::Result<TcpSocket, TcpConnectErrData> {
|
|
unsafe {
|
|
let (result_po, result_ch) = stream::<ConnAttempt>();
|
|
let result_ch = SharedChan::new(result_ch);
|
|
let (closed_signal_po, closed_signal_ch) = stream::<()>();
|
|
let closed_signal_ch = SharedChan::new(closed_signal_ch);
|
|
let conn_data = ConnectReqData {
|
|
result_ch: result_ch,
|
|
closed_signal_ch: closed_signal_ch
|
|
};
|
|
let conn_data_ptr: *ConnectReqData = &conn_data;
|
|
let (reader_po, reader_ch) = stream::<Result<~[u8], TcpErrData>>();
|
|
let reader_ch = SharedChan::new(reader_ch);
|
|
let stream_handle_ptr = malloc_uv_tcp_t();
|
|
*(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
|
|
let socket_data = @TcpSocketData {
|
|
reader_po: @reader_po,
|
|
reader_ch: reader_ch,
|
|
stream_handle_ptr: stream_handle_ptr,
|
|
connect_req: uv::ll::connect_t(),
|
|
write_req: uv::ll::write_t(),
|
|
ipv6: match input_ip {
|
|
ip::Ipv4(_) => { false }
|
|
ip::Ipv6(_) => { true }
|
|
},
|
|
iotask: iotask.clone()
|
|
};
|
|
let socket_data_ptr: *TcpSocketData = &*socket_data;
|
|
// get an unsafe representation of our stream_handle_ptr that
|
|
// we can send into the interact cb to be handled in libuv..
|
|
debug!("stream_handle_ptr outside interact %?",
|
|
stream_handle_ptr);
|
|
do iotask::interact(iotask) |loop_ptr| {
|
|
unsafe {
|
|
debug!("in interact cb for tcp client connect..");
|
|
debug!("stream_handle_ptr in interact %?",
|
|
stream_handle_ptr);
|
|
match uv::ll::tcp_init( loop_ptr, stream_handle_ptr) {
|
|
0i32 => {
|
|
debug!("tcp_init successful");
|
|
debug!("dealing w/ ipv4 connection..");
|
|
let connect_req_ptr: *uv::ll::uv_connect_t =
|
|
&(*socket_data_ptr).connect_req;
|
|
let addr_str = ip::format_addr(&input_ip);
|
|
let connect_result = match input_ip {
|
|
ip::Ipv4(ref addr) => {
|
|
// have to "recreate" the
|
|
// sockaddr_in/6 since the ip_addr
|
|
// discards the port info.. should
|
|
// probably add an additional rust
|
|
// type that actually is closer to
|
|
// what the libuv API expects (ip str
|
|
// + port num)
|
|
debug!("addr: %?", addr);
|
|
let in_addr = uv::ll::ip4_addr(addr_str,
|
|
port as int);
|
|
uv::ll::tcp_connect(
|
|
connect_req_ptr,
|
|
stream_handle_ptr,
|
|
&in_addr,
|
|
tcp_connect_on_connect_cb)
|
|
}
|
|
ip::Ipv6(ref addr) => {
|
|
debug!("addr: %?", addr);
|
|
let in_addr = uv::ll::ip6_addr(addr_str,
|
|
port as int);
|
|
uv::ll::tcp_connect6(
|
|
connect_req_ptr,
|
|
stream_handle_ptr,
|
|
&in_addr,
|
|
tcp_connect_on_connect_cb)
|
|
}
|
|
};
|
|
match connect_result {
|
|
0i32 => {
|
|
debug!("tcp_connect successful: \
|
|
stream %x,
|
|
socket data %x",
|
|
stream_handle_ptr as uint,
|
|
socket_data_ptr as uint);
|
|
// reusable data that we'll have for the
|
|
// duration..
|
|
uv::ll::set_data_for_uv_handle(
|
|
stream_handle_ptr,
|
|
socket_data_ptr as
|
|
*libc::c_void);
|
|
// just so the connect_cb can send the
|
|
// outcome..
|
|
uv::ll::set_data_for_req(connect_req_ptr,
|
|
conn_data_ptr);
|
|
debug!("leaving tcp_connect interact cb...");
|
|
// let tcp_connect_on_connect_cb send on
|
|
// the result_ch, now..
|
|
}
|
|
_ => {
|
|
// immediate connect
|
|
// failure.. probably a garbage ip or
|
|
// somesuch
|
|
let err_data =
|
|
uv::ll::get_last_err_data(loop_ptr);
|
|
let result_ch = (*conn_data_ptr)
|
|
.result_ch.clone();
|
|
result_ch.send(ConnFailure(err_data));
|
|
uv::ll::set_data_for_uv_handle(
|
|
stream_handle_ptr,
|
|
conn_data_ptr);
|
|
uv::ll::close(stream_handle_ptr,
|
|
stream_error_close_cb);
|
|
}
|
|
}
|
|
}
|
|
_ => {
|
|
// failure to create a tcp handle
|
|
let err_data = uv::ll::get_last_err_data(loop_ptr);
|
|
let result_ch = (*conn_data_ptr).result_ch.clone();
|
|
result_ch.send(ConnFailure(err_data));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
match result_po.recv() {
|
|
ConnSuccess => {
|
|
debug!("tcp::connect - received success on result_po");
|
|
result::Ok(TcpSocket(socket_data))
|
|
}
|
|
ConnFailure(ref err_data) => {
|
|
closed_signal_po.recv();
|
|
debug!("tcp::connect - received failure on result_po");
|
|
// still have to free the malloc'd stream handle..
|
|
rustrt::rust_uv_current_kernel_free(stream_handle_ptr
|
|
as *libc::c_void);
|
|
let tcp_conn_err = match err_data.err_name {
|
|
~"ECONNREFUSED" => ConnectionRefused,
|
|
_ => GenericConnectErr(copy err_data.err_name,
|
|
copy err_data.err_msg)
|
|
};
|
|
result::Err(tcp_conn_err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Write binary data to a tcp stream; Blocks until operation completes
|
|
*
|
|
* # Arguments
|
|
*
|
|
* * sock - a `TcpSocket` to write to
|
|
* * raw_write_data - a vector of `~[u8]` that will be written to the stream.
|
|
* This value must remain valid for the duration of the `write` call
|
|
*
|
|
* # Returns
|
|
*
|
|
* A `Result` object with a `()` value as the `Ok` variant, or a
|
|
* `TcpErrData` value as the `Err` variant
|
|
*/
|
|
pub fn write(sock: &TcpSocket, raw_write_data: ~[u8])
|
|
-> result::Result<(), TcpErrData> {
|
|
let socket_data_ptr: *TcpSocketData = &*sock.socket_data;
|
|
write_common_impl(socket_data_ptr, raw_write_data)
|
|
}
|
|
|
|
/**
|
|
* Write binary data to tcp stream; Returns a `future::Future` value
|
|
* immediately
|
|
*
|
|
* # Safety
|
|
*
|
|
* This function can produce unsafe results if:
|
|
*
|
|
* 1. the call to `write_future` is made
|
|
* 2. the `future::Future` value returned is never resolved via
|
|
* `Future::get`
|
|
* 3. and then the `TcpSocket` passed in to `write_future` leaves
|
|
* scope and is destructed before the task that runs the libuv write
|
|
* operation completes.
|
|
*
|
|
* As such: If using `write_future`, always be sure to resolve the returned
|
|
* `Future` so as to ensure libuv doesn't try to access a released write
|
|
* handle. Otherwise, use the blocking `tcp::write` function instead.
|
|
*
|
|
* # Arguments
|
|
*
|
|
* * sock - a `TcpSocket` to write to
|
|
* * raw_write_data - a vector of `~[u8]` that will be written to the stream.
|
|
* This value must remain valid for the duration of the `write` call
|
|
*
|
|
* # Returns
|
|
*
|
|
* A `Future` value that, once the `write` operation completes, resolves to a
|
|
* `Result` object with a `nil` value as the `Ok` variant, or a `TcpErrData`
|
|
* value as the `Err` variant
|
|
*/
|
|
pub fn write_future(sock: &TcpSocket, raw_write_data: ~[u8])
|
|
-> future::Future<result::Result<(), TcpErrData>>
|
|
{
|
|
let socket_data_ptr: *TcpSocketData = &*sock.socket_data;
|
|
do future_spawn {
|
|
let data_copy = copy(raw_write_data);
|
|
write_common_impl(socket_data_ptr, data_copy)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Begin reading binary data from an open TCP connection; used with
|
|
* `read_stop`
|
|
*
|
|
* # Arguments
|
|
*
|
|
* * sock -- a `net::tcp::TcpSocket` for the connection to read from
|
|
*
|
|
* # Returns
|
|
*
|
|
* * A `Result` instance that will either contain a
|
|
* `core::comm::Port<Result<~[u8], TcpErrData>>` that the user can read
|
|
* (and * optionally, loop on) from until `read_stop` is called, or a
|
|
* `TcpErrData` record
|
|
*/
|
|
pub fn read_start(sock: &TcpSocket)
|
|
-> result::Result<@Port<result::Result<~[u8],
|
|
TcpErrData>>,
|
|
TcpErrData> {
|
|
let socket_data: *TcpSocketData = &*sock.socket_data;
|
|
read_start_common_impl(socket_data)
|
|
}
|
|
|
|
/**
|
|
* Stop reading from an open TCP connection; used with `read_start`
|
|
*
|
|
* # Arguments
|
|
*
|
|
* * `sock` - a `net::tcp::TcpSocket` that you wish to stop reading on
|
|
*/
|
|
pub fn read_stop(sock: &TcpSocket) -> result::Result<(), TcpErrData> {
|
|
let socket_data: *TcpSocketData = &*sock.socket_data;
|
|
read_stop_common_impl(socket_data)
|
|
}
|
|
|
|
/**
|
|
* Reads a single chunk of data from `TcpSocket`; block until data/error
|
|
* recv'd
|
|
*
|
|
* Does a blocking read operation for a single chunk of data from a
|
|
* `TcpSocket` until a data arrives or an error is received. The provided
|
|
* `timeout_msecs` value is used to raise an error if the timeout period
|
|
* passes without any data received.
|
|
*
|
|
* # Arguments
|
|
*
|
|
* * `sock` - a `net::tcp::TcpSocket` that you wish to read from
|
|
* * `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
|
|
* read attempt. Pass `0u` to wait indefinitely
|
|
*/
|
|
pub fn read(sock: &TcpSocket, timeout_msecs: uint)
|
|
-> result::Result<~[u8],TcpErrData> {
|
|
let socket_data: *TcpSocketData = &*sock.socket_data;
|
|
read_common_impl(socket_data, timeout_msecs)
|
|
}
|
|
|
|
/**
|
|
* Reads a single chunk of data; returns a `future::Future<~[u8]>`
|
|
* immediately
|
|
*
|
|
* Does a non-blocking read operation for a single chunk of data from a
|
|
* `TcpSocket` and immediately returns a `Future` value representing the
|
|
* result. When resolving the returned `Future`, it will block until data
|
|
* arrives or an error is received. The provided `timeout_msecs`
|
|
* value is used to raise an error if the timeout period passes without any
|
|
* data received.
|
|
*
|
|
* # Safety
|
|
*
|
|
* This function can produce unsafe results if the call to `read_future` is
|
|
* made, the `future::Future` value returned is never resolved via
|
|
* `Future::get`, and then the `TcpSocket` passed in to `read_future` leaves
|
|
* scope and is destructed before the task that runs the libuv read
|
|
* operation completes.
|
|
*
|
|
* As such: If using `read_future`, always be sure to resolve the returned
|
|
* `Future` so as to ensure libuv doesn't try to access a released read
|
|
* handle. Otherwise, use the blocking `tcp::read` function instead.
|
|
*
|
|
* # Arguments
|
|
*
|
|
* * `sock` - a `net::tcp::TcpSocket` that you wish to read from
|
|
* * `timeout_msecs` - a `uint` value, in msecs, to wait before dropping the
|
|
* read attempt. Pass `0u` to wait indefinitely
|
|
*/
|
|
fn read_future(sock: &TcpSocket, timeout_msecs: uint)
|
|
-> future::Future<result::Result<~[u8],TcpErrData>> {
|
|
let socket_data: *TcpSocketData = &*sock.socket_data;
|
|
do future_spawn {
|
|
read_common_impl(socket_data, timeout_msecs)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Bind an incoming client connection to a `net::tcp::TcpSocket`
|
|
*
|
|
* # Notes
|
|
*
|
|
* It is safe to call `net::tcp::accept` _only_ within the context of the
|
|
* `new_connect_cb` callback provided as the final argument to the
|
|
* `net::tcp::listen` function.
|
|
*
|
|
* The `new_conn` opaque value is provided _only_ as the first argument to the
|
|
* `new_connect_cb` provided as a part of `net::tcp::listen`.
|
|
* It can be safely sent to another task but it _must_ be
|
|
* used (via `net::tcp::accept`) before the `new_connect_cb` call it was
|
|
* provided to returns.
|
|
*
|
|
* This implies that a port/chan pair must be used to make sure that the
|
|
* `new_connect_cb` call blocks until an attempt to create a
|
|
* `net::tcp::TcpSocket` is completed.
|
|
*
|
|
* # Example
|
|
*
|
|
* Here, the `new_conn` is used in conjunction with `accept` from within
|
|
* a task spawned by the `new_connect_cb` passed into `listen`
|
|
*
|
|
* ~~~ {.rust}
|
|
* do net::tcp::listen(remote_ip, remote_port, backlog, iotask,
|
|
* // this callback is ran once after the connection is successfully
|
|
* // set up
|
|
* |kill_ch| {
|
|
* // pass the kill_ch to your main loop or wherever you want
|
|
* // to be able to externally kill the server from
|
|
* })
|
|
* // this callback is ran when a new connection arrives
|
|
* |new_conn, kill_ch| {
|
|
* let (cont_po, cont_ch) = comm::stream::<option::Option<TcpErrData>>();
|
|
* do task::spawn {
|
|
* let accept_result = net::tcp::accept(new_conn);
|
|
* match accept_result {
|
|
* Err(accept_error) => {
|
|
* cont_ch.send(Some(accept_error));
|
|
* // fail?
|
|
* },
|
|
* Ok(sock) => {
|
|
* cont_ch.send(None);
|
|
* // do work here
|
|
* }
|
|
* }
|
|
* };
|
|
* match cont_po.recv() {
|
|
* // shut down listen()
|
|
* Some(err_data) => kill_ch.send(Some(err_data)),
|
|
* // wait for next connection
|
|
* None => ()
|
|
* }
|
|
* };
|
|
* ~~~
|
|
*
|
|
* # Arguments
|
|
*
|
|
* * `new_conn` - an opaque value used to create a new `TcpSocket`
|
|
*
|
|
* # Returns
|
|
*
|
|
* On success, this function will return a `net::tcp::TcpSocket` as the
|
|
* `Ok` variant of a `Result`. The `net::tcp::TcpSocket` is anchored within
|
|
* the task that `accept` was called within for its lifetime. On failure,
|
|
* this function will return a `net::tcp::TcpErrData` record
|
|
* as the `Err` variant of a `Result`.
|
|
*/
|
|
pub fn accept(new_conn: TcpNewConnection)
|
|
-> result::Result<TcpSocket, TcpErrData> {
|
|
unsafe {
|
|
match new_conn{
|
|
NewTcpConn(server_handle_ptr) => {
|
|
let server_data_ptr = uv::ll::get_data_for_uv_handle(
|
|
server_handle_ptr) as *TcpListenFcData;
|
|
let (reader_po, reader_ch) = stream::<
|
|
Result<~[u8], TcpErrData>>();
|
|
let reader_ch = SharedChan::new(reader_ch);
|
|
let iotask = &(*server_data_ptr).iotask;
|
|
let stream_handle_ptr = malloc_uv_tcp_t();
|
|
*(stream_handle_ptr as *mut uv::ll::uv_tcp_t) =
|
|
uv::ll::tcp_t();
|
|
let client_socket_data: @TcpSocketData = @TcpSocketData {
|
|
reader_po: @reader_po,
|
|
reader_ch: reader_ch,
|
|
stream_handle_ptr : stream_handle_ptr,
|
|
connect_req : uv::ll::connect_t(),
|
|
write_req : uv::ll::write_t(),
|
|
ipv6: (*server_data_ptr).ipv6,
|
|
iotask : iotask.clone()
|
|
};
|
|
let client_socket_data_ptr: *TcpSocketData =
|
|
&*client_socket_data;
|
|
let client_stream_handle_ptr =
|
|
(*client_socket_data_ptr).stream_handle_ptr;
|
|
|
|
let (result_po, result_ch) = stream::<Option<TcpErrData>>();
|
|
let result_ch = SharedChan::new(result_ch);
|
|
|
|
// UNSAFE LIBUV INTERACTION BEGIN
|
|
// .. normally this happens within the context of
|
|
// a call to uv::hl::interact.. but we're breaking
|
|
// the rules here because this always has to be
|
|
// called within the context of a listen() new_connect_cb
|
|
// callback (or it will likely fail and drown your cat)
|
|
debug!("in interact cb for tcp::accept");
|
|
let loop_ptr = uv::ll::get_loop_for_uv_handle(
|
|
server_handle_ptr);
|
|
match uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
|
|
0i32 => {
|
|
debug!("uv_tcp_init successful for \
|
|
client stream");
|
|
match uv::ll::accept(
|
|
server_handle_ptr as *libc::c_void,
|
|
client_stream_handle_ptr as *libc::c_void) {
|
|
0i32 => {
|
|
debug!("successfully accepted client \
|
|
connection: \
|
|
stream %x, \
|
|
socket data %x",
|
|
client_stream_handle_ptr as uint,
|
|
client_socket_data_ptr as uint);
|
|
uv::ll::set_data_for_uv_handle(
|
|
client_stream_handle_ptr,
|
|
client_socket_data_ptr
|
|
as *libc::c_void);
|
|
let ptr = uv::ll::get_data_for_uv_handle(
|
|
client_stream_handle_ptr);
|
|
debug!("ptrs: %x %x",
|
|
client_socket_data_ptr as uint,
|
|
ptr as uint);
|
|
result_ch.send(None);
|
|
}
|
|
_ => {
|
|
debug!("failed to accept client conn");
|
|
result_ch.send(Some(
|
|
uv::ll::get_last_err_data(
|
|
loop_ptr).to_tcp_err()));
|
|
}
|
|
}
|
|
}
|
|
_ => {
|
|
debug!("failed to accept client stream");
|
|
result_ch.send(Some(
|
|
uv::ll::get_last_err_data(
|
|
loop_ptr).to_tcp_err()));
|
|
}
|
|
}
|
|
// UNSAFE LIBUV INTERACTION END
|
|
match result_po.recv() {
|
|
Some(err_data) => result::Err(err_data),
|
|
None => result::Ok(TcpSocket(client_socket_data))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Bind to a given IP/port and listen for new connections
|
|
*
|
|
* # Arguments
|
|
*
|
|
* * `host_ip` - a `net::ip::IpAddr` representing a unique IP
|
|
* (versions 4 or 6)
|
|
* * `port` - a uint representing the port to listen on
|
|
* * `backlog` - a uint representing the number of incoming connections
|
|
* to cache in memory
|
|
* * `hl_loop` - a `uv_iotask::IoTask` that the tcp request will run on
|
|
* * `on_establish_cb` - a callback that is evaluated if/when the listener
|
|
* is successfully established. it takes no parameters
|
|
* * `new_connect_cb` - a callback to be evaluated, on the libuv thread,
|
|
* whenever a client attempts to conect on the provided ip/port. the
|
|
* callback's arguments are:
|
|
* * `new_conn` - an opaque type that can be passed to
|
|
* `net::tcp::accept` in order to be converted to a `TcpSocket`.
|
|
* * `kill_ch` - channel of type `core::comm::Chan<Option<tcp_err_data>>`.
|
|
* this channel can be used to send a message to cause `listen` to begin
|
|
* closing the underlying libuv data structures.
|
|
*
|
|
* # returns
|
|
*
|
|
* a `Result` instance containing empty data of type `()` on a
|
|
* successful/normal shutdown, and a `TcpListenErrData` enum in the event
|
|
* of listen exiting because of an error
|
|
*/
|
|
pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint,
|
|
iotask: &IoTask,
|
|
on_establish_cb: ~fn(SharedChan<Option<TcpErrData>>),
|
|
new_connect_cb: ~fn(TcpNewConnection,
|
|
SharedChan<Option<TcpErrData>>))
|
|
-> result::Result<(), TcpListenErrData> {
|
|
do listen_common(host_ip, port, backlog, iotask,
|
|
on_establish_cb)
|
|
// on_connect_cb
|
|
|handle| {
|
|
unsafe {
|
|
let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
|
|
as *TcpListenFcData;
|
|
let new_conn = NewTcpConn(handle);
|
|
let kill_ch = (*server_data_ptr).kill_ch.clone();
|
|
new_connect_cb(new_conn, kill_ch);
|
|
}
|
|
}
|
|
}
|
|
|
|
fn listen_common(host_ip: ip::IpAddr,
|
|
port: uint,
|
|
backlog: uint,
|
|
iotask: &IoTask,
|
|
on_establish_cb: ~fn(SharedChan<Option<TcpErrData>>),
|
|
on_connect_cb: ~fn(*uv::ll::uv_tcp_t))
|
|
-> result::Result<(), TcpListenErrData> {
|
|
let (stream_closed_po, stream_closed_ch) = stream::<()>();
|
|
let stream_closed_ch = SharedChan::new(stream_closed_ch);
|
|
let (kill_po, kill_ch) = stream::<Option<TcpErrData>>();
|
|
let kill_ch = SharedChan::new(kill_ch);
|
|
let server_stream = uv::ll::tcp_t();
|
|
let server_stream_ptr: *uv::ll::uv_tcp_t = &server_stream;
|
|
let server_data: TcpListenFcData = TcpListenFcData {
|
|
server_stream_ptr: server_stream_ptr,
|
|
stream_closed_ch: stream_closed_ch,
|
|
kill_ch: kill_ch.clone(),
|
|
on_connect_cb: on_connect_cb,
|
|
iotask: iotask.clone(),
|
|
ipv6: match &host_ip {
|
|
&ip::Ipv4(_) => { false }
|
|
&ip::Ipv6(_) => { true }
|
|
},
|
|
active: @mut true
|
|
};
|
|
let server_data_ptr: *TcpListenFcData = &server_data;
|
|
|
|
let (setup_po, setup_ch) = stream();
|
|
|
|
// this is to address a compiler warning about
|
|
// an implicit copy.. it seems that double nested
|
|
// will defeat a move sigil, as is done to the host_ip
|
|
// arg above.. this same pattern works w/o complaint in
|
|
// tcp::connect (because the iotask::interact cb isn't
|
|
// nested within a core::comm::listen block)
|
|
let loc_ip = copy(host_ip);
|
|
do iotask::interact(iotask) |loop_ptr| {
|
|
unsafe {
|
|
match uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
|
|
0i32 => {
|
|
uv::ll::set_data_for_uv_handle(
|
|
server_stream_ptr,
|
|
server_data_ptr);
|
|
let addr_str = ip::format_addr(&loc_ip);
|
|
let bind_result = match loc_ip {
|
|
ip::Ipv4(ref addr) => {
|
|
debug!("addr: %?", addr);
|
|
let in_addr = uv::ll::ip4_addr(
|
|
addr_str,
|
|
port as int);
|
|
uv::ll::tcp_bind(server_stream_ptr, &in_addr)
|
|
}
|
|
ip::Ipv6(ref addr) => {
|
|
debug!("addr: %?", addr);
|
|
let in_addr = uv::ll::ip6_addr(
|
|
addr_str,
|
|
port as int);
|
|
uv::ll::tcp_bind6(server_stream_ptr, &in_addr)
|
|
}
|
|
};
|
|
match bind_result {
|
|
0i32 => {
|
|
match uv::ll::listen(
|
|
server_stream_ptr,
|
|
backlog as libc::c_int,
|
|
tcp_lfc_on_connection_cb) {
|
|
0i32 => setup_ch.send(None),
|
|
_ => {
|
|
debug!(
|
|
"failure to uv_tcp_init");
|
|
let err_data =
|
|
uv::ll::get_last_err_data(
|
|
loop_ptr);
|
|
setup_ch.send(Some(err_data));
|
|
}
|
|
}
|
|
}
|
|
_ => {
|
|
debug!("failure to uv_tcp_bind");
|
|
let err_data = uv::ll::get_last_err_data(
|
|
loop_ptr);
|
|
setup_ch.send(Some(err_data));
|
|
}
|
|
}
|
|
}
|
|
_ => {
|
|
debug!("failure to uv_tcp_bind");
|
|
let err_data = uv::ll::get_last_err_data(
|
|
loop_ptr);
|
|
setup_ch.send(Some(err_data));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
let setup_result = setup_po.recv();
|
|
|
|
match setup_result {
|
|
Some(ref err_data) => {
|
|
do iotask::interact(iotask) |loop_ptr| {
|
|
unsafe {
|
|
debug!(
|
|
"tcp::listen post-kill recv hl interact %?",
|
|
loop_ptr);
|
|
*(*server_data_ptr).active = false;
|
|
uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
|
|
}
|
|
};
|
|
stream_closed_po.recv();
|
|
match err_data.err_name {
|
|
~"EACCES" => {
|
|
debug!("Got EACCES error");
|
|
result::Err(AccessDenied)
|
|
}
|
|
~"EADDRINUSE" => {
|
|
debug!("Got EADDRINUSE error");
|
|
result::Err(AddressInUse)
|
|
}
|
|
_ => {
|
|
debug!("Got '%s' '%s' libuv error",
|
|
err_data.err_name, err_data.err_msg);
|
|
result::Err(
|
|
GenericListenErr(copy err_data.err_name,
|
|
copy err_data.err_msg))
|
|
}
|
|
}
|
|
}
|
|
None => {
|
|
on_establish_cb(kill_ch.clone());
|
|
let kill_result = kill_po.recv();
|
|
do iotask::interact(iotask) |loop_ptr| {
|
|
unsafe {
|
|
debug!(
|
|
"tcp::listen post-kill recv hl interact %?",
|
|
loop_ptr);
|
|
*(*server_data_ptr).active = false;
|
|
uv::ll::close(server_stream_ptr, tcp_lfc_close_cb);
|
|
}
|
|
};
|
|
stream_closed_po.recv();
|
|
match kill_result {
|
|
// some failure post bind/listen
|
|
Some(ref err_data) => result::Err(GenericListenErr(
|
|
copy err_data.err_name,
|
|
copy err_data.err_msg)),
|
|
// clean exit
|
|
None => result::Ok(())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* Convert a `net::tcp::TcpSocket` to a `net::tcp::TcpSocketBuf`.
|
|
*
|
|
* This function takes ownership of a `net::tcp::TcpSocket`, returning it
|
|
* stored within a buffered wrapper, which can be converted to a `io::Reader`
|
|
* or `io::Writer`
|
|
*
|
|
* # Arguments
|
|
*
|
|
* * `sock` -- a `net::tcp::TcpSocket` that you want to buffer
|
|
*
|
|
* # Returns
|
|
*
|
|
* A buffered wrapper that you can cast as an `io::Reader` or `io::Writer`
|
|
*/
|
|
pub fn socket_buf(sock: TcpSocket) -> TcpSocketBuf {
|
|
TcpSocketBuf(@mut TcpBufferedSocketData {
|
|
sock: sock, buf: ~[], buf_off: 0
|
|
})
|
|
}
|
|
|
|
/// Convenience methods extending `net::tcp::TcpSocket`
|
|
impl TcpSocket {
|
|
pub fn read_start(&self) -> result::Result<@Port<
|
|
result::Result<~[u8], TcpErrData>>, TcpErrData> {
|
|
read_start(self)
|
|
}
|
|
pub fn read_stop(&self) ->
|
|
result::Result<(), TcpErrData> {
|
|
read_stop(self)
|
|
}
|
|
pub fn read(&self, timeout_msecs: uint) ->
|
|
result::Result<~[u8], TcpErrData> {
|
|
read(self, timeout_msecs)
|
|
}
|
|
pub fn read_future(&self, timeout_msecs: uint) ->
|
|
future::Future<result::Result<~[u8], TcpErrData>> {
|
|
read_future(self, timeout_msecs)
|
|
}
|
|
pub fn write(&self, raw_write_data: ~[u8])
|
|
-> result::Result<(), TcpErrData> {
|
|
write(self, raw_write_data)
|
|
}
|
|
pub fn write_future(&self, raw_write_data: ~[u8])
|
|
-> future::Future<result::Result<(), TcpErrData>> {
|
|
write_future(self, raw_write_data)
|
|
}
|
|
pub fn get_peer_addr(&self) -> ip::IpAddr {
|
|
unsafe {
|
|
if self.socket_data.ipv6 {
|
|
let addr = uv::ll::ip6_addr("", 0);
|
|
uv::ll::tcp_getpeername6(self.socket_data.stream_handle_ptr,
|
|
&addr);
|
|
ip::Ipv6(addr)
|
|
} else {
|
|
let addr = uv::ll::ip4_addr("", 0);
|
|
uv::ll::tcp_getpeername(self.socket_data.stream_handle_ptr,
|
|
&addr);
|
|
ip::Ipv4(addr)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Implementation of `io::Reader` trait for a buffered `net::tcp::TcpSocket`
|
|
impl io::Reader for TcpSocketBuf {
|
|
fn read(&self, buf: &mut [u8], len: uint) -> uint {
|
|
if len == 0 { return 0 }
|
|
let mut count: uint = 0;
|
|
|
|
loop {
|
|
assert!(count < len);
|
|
|
|
// If possible, copy up to `len` bytes from the internal
|
|
// `data.buf` into `buf`
|
|
let nbuffered = vec::uniq_len(&const self.data.buf) -
|
|
self.data.buf_off;
|
|
let needed = len - count;
|
|
if nbuffered > 0 {
|
|
unsafe {
|
|
let ncopy = uint::min(nbuffered, needed);
|
|
let dst = ptr::mut_offset(
|
|
vec::raw::to_mut_ptr(buf), count);
|
|
let src = ptr::offset(
|
|
vec::raw::to_ptr(self.data.buf),
|
|
self.data.buf_off);
|
|
ptr::copy_memory(dst, src, ncopy);
|
|
self.data.buf_off += ncopy;
|
|
count += ncopy;
|
|
}
|
|
}
|
|
|
|
assert!(count <= len);
|
|
if count == len {
|
|
break;
|
|
}
|
|
|
|
// We copied all the bytes we had in the internal buffer into
|
|
// the result buffer, but the caller wants more bytes, so we
|
|
// need to read in data from the socket. Note that the internal
|
|
// buffer is of no use anymore as we read all bytes from it,
|
|
// so we can throw it away.
|
|
let read_result = {
|
|
let data = &*self.data;
|
|
read(&data.sock, 0)
|
|
};
|
|
if read_result.is_err() {
|
|
let err_data = read_result.get_err();
|
|
|
|
if err_data.err_name == ~"EOF" {
|
|
*self.end_of_stream = true;
|
|
break;
|
|
} else {
|
|
debug!("ERROR sock_buf as io::reader.read err %? %?",
|
|
err_data.err_name, err_data.err_msg);
|
|
// As we have already copied data into result buffer,
|
|
// we cannot simply return 0 here. Instead the error
|
|
// should show up in a later call to read().
|
|
break;
|
|
}
|
|
} else {
|
|
self.data.buf = result::unwrap(read_result);
|
|
self.data.buf_off = 0;
|
|
}
|
|
}
|
|
|
|
count
|
|
}
|
|
fn read_byte(&self) -> int {
|
|
loop {
|
|
if vec::uniq_len(&const self.data.buf) > self.data.buf_off {
|
|
let c = self.data.buf[self.data.buf_off];
|
|
self.data.buf_off += 1;
|
|
return c as int
|
|
}
|
|
|
|
let read_result = {
|
|
let data = &*self.data;
|
|
read(&data.sock, 0)
|
|
};
|
|
if read_result.is_err() {
|
|
let err_data = read_result.get_err();
|
|
|
|
if err_data.err_name == ~"EOF" {
|
|
*self.end_of_stream = true;
|
|
return -1
|
|
} else {
|
|
debug!("ERROR sock_buf as io::reader.read err %? %?",
|
|
err_data.err_name, err_data.err_msg);
|
|
fail!()
|
|
}
|
|
} else {
|
|
self.data.buf = result::unwrap(read_result);
|
|
self.data.buf_off = 0;
|
|
}
|
|
}
|
|
}
|
|
fn eof(&self) -> bool {
|
|
*self.end_of_stream
|
|
}
|
|
fn seek(&self, dist: int, seek: io::SeekStyle) {
|
|
debug!("tcp_socket_buf seek stub %? %?", dist, seek);
|
|
// noop
|
|
}
|
|
fn tell(&self) -> uint {
|
|
0u // noop
|
|
}
|
|
}
|
|
|
|
/// Implementation of `io::Reader` trait for a buffered `net::tcp::TcpSocket`
|
|
impl io::Writer for TcpSocketBuf {
|
|
pub fn write(&self, data: &[u8]) {
|
|
let socket_data_ptr: *TcpSocketData =
|
|
&(*((*(self.data)).sock).socket_data);
|
|
let w_result = write_common_impl(socket_data_ptr,
|
|
vec::slice(data,
|
|
0,
|
|
data.len()).to_vec());
|
|
if w_result.is_err() {
|
|
let err_data = w_result.get_err();
|
|
debug!(
|
|
"ERROR sock_buf as io::writer.writer err: %? %?",
|
|
err_data.err_name, err_data.err_msg);
|
|
}
|
|
}
|
|
fn seek(&self, dist: int, seek: io::SeekStyle) {
|
|
debug!("tcp_socket_buf seek stub %? %?", dist, seek);
|
|
// noop
|
|
}
|
|
fn tell(&self) -> uint {
|
|
0u
|
|
}
|
|
fn flush(&self) -> int {
|
|
0
|
|
}
|
|
fn get_type(&self) -> io::WriterType {
|
|
io::File
|
|
}
|
|
}
|
|
|
|
// INTERNAL API
|
|
|
|
fn tear_down_socket_data(socket_data: @TcpSocketData) {
|
|
unsafe {
|
|
let (closed_po, closed_ch) = stream::<()>();
|
|
let closed_ch = SharedChan::new(closed_ch);
|
|
let close_data = TcpSocketCloseData {
|
|
closed_ch: closed_ch
|
|
};
|
|
let close_data_ptr: *TcpSocketCloseData = &close_data;
|
|
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
|
|
do iotask::interact(&(*socket_data).iotask) |loop_ptr| {
|
|
unsafe {
|
|
debug!(
|
|
"interact dtor for tcp_socket stream %? loop %?",
|
|
stream_handle_ptr, loop_ptr);
|
|
uv::ll::set_data_for_uv_handle(stream_handle_ptr,
|
|
close_data_ptr);
|
|
uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb);
|
|
}
|
|
};
|
|
closed_po.recv();
|
|
//the line below will most likely crash
|
|
//log(debug, fmt!("about to free socket_data at %?", socket_data));
|
|
rustrt::rust_uv_current_kernel_free(stream_handle_ptr
|
|
as *libc::c_void);
|
|
debug!("exiting dtor for tcp_socket");
|
|
}
|
|
}
|
|
|
|
// shared implementation for tcp::read
|
|
fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint)
|
|
-> result::Result<~[u8],TcpErrData> {
|
|
unsafe {
|
|
use timer;
|
|
|
|
debug!("starting tcp::read");
|
|
let iotask = &(*socket_data).iotask;
|
|
let rs_result = read_start_common_impl(socket_data);
|
|
if result::is_err(&rs_result) {
|
|
let err_data = result::get_err(&rs_result);
|
|
result::Err(err_data)
|
|
}
|
|
else {
|
|
debug!("tcp::read before recv_timeout");
|
|
let read_result = if timeout_msecs > 0u {
|
|
timer::recv_timeout(
|
|
iotask, timeout_msecs, result::unwrap(rs_result))
|
|
} else {
|
|
Some(result::get(&rs_result).recv())
|
|
};
|
|
debug!("tcp::read after recv_timeout");
|
|
match read_result {
|
|
None => {
|
|
debug!("tcp::read: timed out..");
|
|
let err_data = TcpErrData {
|
|
err_name: ~"TIMEOUT",
|
|
err_msg: ~"req timed out"
|
|
};
|
|
read_stop_common_impl(socket_data);
|
|
result::Err(err_data)
|
|
}
|
|
Some(data_result) => {
|
|
debug!("tcp::read got data");
|
|
read_stop_common_impl(socket_data);
|
|
data_result
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// shared impl for read_stop
|
|
fn read_stop_common_impl(socket_data: *TcpSocketData) ->
|
|
result::Result<(), TcpErrData> {
|
|
unsafe {
|
|
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
|
|
let (stop_po, stop_ch) = stream::<Option<TcpErrData>>();
|
|
do iotask::interact(&(*socket_data).iotask) |loop_ptr| {
|
|
unsafe {
|
|
debug!("in interact cb for tcp::read_stop");
|
|
match uv::ll::read_stop(stream_handle_ptr
|
|
as *uv::ll::uv_stream_t) {
|
|
0i32 => {
|
|
debug!("successfully called uv_read_stop");
|
|
stop_ch.send(None);
|
|
}
|
|
_ => {
|
|
debug!("failure in calling uv_read_stop");
|
|
let err_data = uv::ll::get_last_err_data(loop_ptr);
|
|
stop_ch.send(Some(err_data.to_tcp_err()));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
match stop_po.recv() {
|
|
Some(err_data) => Err(err_data),
|
|
None => Ok(())
|
|
}
|
|
}
|
|
}
|
|
|
|
// shared impl for read_start
|
|
fn read_start_common_impl(socket_data: *TcpSocketData)
|
|
-> result::Result<@Port<
|
|
result::Result<~[u8], TcpErrData>>, TcpErrData> {
|
|
unsafe {
|
|
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
|
|
let (start_po, start_ch) = stream::<Option<uv::ll::uv_err_data>>();
|
|
debug!("in tcp::read_start before interact loop");
|
|
do iotask::interact(&(*socket_data).iotask) |loop_ptr| {
|
|
unsafe {
|
|
debug!("in tcp::read_start interact cb %?",
|
|
loop_ptr);
|
|
match uv::ll::read_start(stream_handle_ptr
|
|
as *uv::ll::uv_stream_t,
|
|
on_alloc_cb,
|
|
on_tcp_read_cb) {
|
|
0i32 => {
|
|
debug!("success doing uv_read_start");
|
|
start_ch.send(None);
|
|
}
|
|
_ => {
|
|
debug!("error attempting uv_read_start");
|
|
let err_data = uv::ll::get_last_err_data(loop_ptr);
|
|
start_ch.send(Some(err_data));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
match start_po.recv() {
|
|
Some(ref err_data) => result::Err(
|
|
err_data.to_tcp_err()),
|
|
None => {
|
|
result::Ok((*socket_data).reader_po)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// helper to convert a "class" vector of [u8] to a *[uv::ll::uv_buf_t]
|
|
|
|
// shared implementation used by write and write_future
|
|
fn write_common_impl(socket_data_ptr: *TcpSocketData,
|
|
raw_write_data: ~[u8])
|
|
-> result::Result<(), TcpErrData> {
|
|
unsafe {
|
|
let write_req_ptr: *uv::ll::uv_write_t =
|
|
&(*socket_data_ptr).write_req;
|
|
let stream_handle_ptr =
|
|
(*socket_data_ptr).stream_handle_ptr;
|
|
let write_buf_vec = ~[
|
|
uv::ll::buf_init(vec::raw::to_ptr(raw_write_data),
|
|
raw_write_data.len())
|
|
];
|
|
let write_buf_vec_ptr: *~[uv::ll::uv_buf_t] = &write_buf_vec;
|
|
let (result_po, result_ch) = stream::<TcpWriteResult>();
|
|
let result_ch = SharedChan::new(result_ch);
|
|
let write_data = WriteReqData {
|
|
result_ch: result_ch
|
|
};
|
|
let write_data_ptr: *WriteReqData = &write_data;
|
|
do iotask::interact(&(*socket_data_ptr).iotask) |loop_ptr| {
|
|
unsafe {
|
|
debug!("in interact cb for tcp::write %?",
|
|
loop_ptr);
|
|
match uv::ll::write(write_req_ptr,
|
|
stream_handle_ptr,
|
|
write_buf_vec_ptr,
|
|
tcp_write_complete_cb) {
|
|
0i32 => {
|
|
debug!("uv_write() invoked successfully");
|
|
uv::ll::set_data_for_req(write_req_ptr,
|
|
write_data_ptr);
|
|
}
|
|
_ => {
|
|
debug!("error invoking uv_write()");
|
|
let err_data = uv::ll::get_last_err_data(loop_ptr);
|
|
let result_ch = (*write_data_ptr).result_ch.clone();
|
|
result_ch.send(TcpWriteError(err_data.to_tcp_err()));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// FIXME (#2656): Instead of passing unsafe pointers to local data,
|
|
// and waiting here for the write to complete, we should transfer
|
|
// ownership of everything to the I/O task and let it deal with the
|
|
// aftermath, so we don't have to sit here blocking.
|
|
match result_po.recv() {
|
|
TcpWriteSuccess => Ok(()),
|
|
TcpWriteError(err_data) => Err(err_data)
|
|
}
|
|
}
|
|
}
|
|
|
|
enum TcpNewConnection {
|
|
NewTcpConn(*uv::ll::uv_tcp_t)
|
|
}
|
|
|
|
struct TcpListenFcData {
|
|
server_stream_ptr: *uv::ll::uv_tcp_t,
|
|
stream_closed_ch: SharedChan<()>,
|
|
kill_ch: SharedChan<Option<TcpErrData>>,
|
|
on_connect_cb: ~fn(*uv::ll::uv_tcp_t),
|
|
iotask: IoTask,
|
|
ipv6: bool,
|
|
active: @mut bool,
|
|
}
|
|
|
|
extern fn tcp_lfc_close_cb(handle: *uv::ll::uv_tcp_t) {
|
|
unsafe {
|
|
let server_data_ptr = uv::ll::get_data_for_uv_handle(
|
|
handle) as *TcpListenFcData;
|
|
let stream_closed_ch = (*server_data_ptr).stream_closed_ch.clone();
|
|
stream_closed_ch.send(());
|
|
}
|
|
}
|
|
|
|
extern fn tcp_lfc_on_connection_cb(handle: *uv::ll::uv_tcp_t,
|
|
status: libc::c_int) {
|
|
unsafe {
|
|
let server_data_ptr = uv::ll::get_data_for_uv_handle(handle)
|
|
as *TcpListenFcData;
|
|
let kill_ch = (*server_data_ptr).kill_ch.clone();
|
|
if *(*server_data_ptr).active {
|
|
match status {
|
|
0i32 => ((*server_data_ptr).on_connect_cb)(handle),
|
|
_ => {
|
|
let loop_ptr = uv::ll::get_loop_for_uv_handle(handle);
|
|
kill_ch.send(
|
|
Some(uv::ll::get_last_err_data(loop_ptr)
|
|
.to_tcp_err()));
|
|
*(*server_data_ptr).active = false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn malloc_uv_tcp_t() -> *uv::ll::uv_tcp_t {
|
|
unsafe {
|
|
rustrt::rust_uv_current_kernel_malloc(
|
|
rustrt::rust_uv_helper_uv_tcp_t_size()) as *uv::ll::uv_tcp_t
|
|
}
|
|
}
|
|
|
|
enum TcpConnectResult {
|
|
TcpConnected(TcpSocket),
|
|
TcpConnectError(TcpErrData)
|
|
}
|
|
|
|
enum TcpWriteResult {
|
|
TcpWriteSuccess,
|
|
TcpWriteError(TcpErrData)
|
|
}
|
|
|
|
enum TcpReadStartResult {
|
|
TcpReadStartSuccess(Port<TcpReadResult>),
|
|
TcpReadStartError(TcpErrData)
|
|
}
|
|
|
|
enum TcpReadResult {
|
|
TcpReadData(~[u8]),
|
|
TcpReadDone,
|
|
TcpReadErr(TcpErrData)
|
|
}
|
|
|
|
trait ToTcpErr {
|
|
fn to_tcp_err(&self) -> TcpErrData;
|
|
}
|
|
|
|
impl ToTcpErr for uv::ll::uv_err_data {
|
|
fn to_tcp_err(&self) -> TcpErrData {
|
|
TcpErrData { err_name: copy self.err_name, err_msg: copy self.err_msg }
|
|
}
|
|
}
|
|
|
|
extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
|
|
nread: libc::ssize_t,
|
|
buf: uv::ll::uv_buf_t) {
|
|
unsafe {
|
|
debug!("entering on_tcp_read_cb stream: %x nread: %?",
|
|
stream as uint, nread);
|
|
let loop_ptr = uv::ll::get_loop_for_uv_handle(stream);
|
|
let socket_data_ptr = uv::ll::get_data_for_uv_handle(stream)
|
|
as *TcpSocketData;
|
|
debug!("socket data is %x", socket_data_ptr as uint);
|
|
match nread as int {
|
|
// incoming err.. probably eof
|
|
-1 => {
|
|
let err_data = uv::ll::get_last_err_data(loop_ptr).to_tcp_err();
|
|
debug!("on_tcp_read_cb: incoming err.. name %? msg %?",
|
|
err_data.err_name, err_data.err_msg);
|
|
let reader_ch = &(*socket_data_ptr).reader_ch;
|
|
reader_ch.send(result::Err(err_data));
|
|
}
|
|
// do nothing .. unneeded buf
|
|
0 => (),
|
|
// have data
|
|
_ => {
|
|
// we have data
|
|
debug!("tcp on_read_cb nread: %d", nread as int);
|
|
let reader_ch = &(*socket_data_ptr).reader_ch;
|
|
let buf_base = uv::ll::get_base_from_buf(buf);
|
|
let new_bytes = vec::from_buf(buf_base, nread as uint);
|
|
reader_ch.send(result::Ok(new_bytes));
|
|
}
|
|
}
|
|
uv::ll::free_base_of_buf(buf);
|
|
debug!("exiting on_tcp_read_cb");
|
|
}
|
|
}
|
|
|
|
extern fn on_alloc_cb(handle: *libc::c_void,
|
|
suggested_size: size_t)
|
|
-> uv::ll::uv_buf_t {
|
|
unsafe {
|
|
debug!("tcp read on_alloc_cb!");
|
|
let char_ptr = uv::ll::malloc_buf_base_of(suggested_size);
|
|
debug!("tcp read on_alloc_cb h: %? char_ptr: %u sugsize: %u",
|
|
handle,
|
|
char_ptr as uint,
|
|
suggested_size as uint);
|
|
uv::ll::buf_init(char_ptr, suggested_size as uint)
|
|
}
|
|
}
|
|
|
|
struct TcpSocketCloseData {
|
|
closed_ch: SharedChan<()>,
|
|
}
|
|
|
|
extern fn tcp_socket_dtor_close_cb(handle: *uv::ll::uv_tcp_t) {
|
|
unsafe {
|
|
let data = uv::ll::get_data_for_uv_handle(handle)
|
|
as *TcpSocketCloseData;
|
|
let closed_ch = (*data).closed_ch.clone();
|
|
closed_ch.send(());
|
|
debug!("tcp_socket_dtor_close_cb exiting..");
|
|
}
|
|
}
|
|
|
|
extern fn tcp_write_complete_cb(write_req: *uv::ll::uv_write_t,
|
|
status: libc::c_int) {
|
|
unsafe {
|
|
let write_data_ptr = uv::ll::get_data_for_req(write_req)
|
|
as *WriteReqData;
|
|
if status == 0i32 {
|
|
debug!("successful write complete");
|
|
let result_ch = (*write_data_ptr).result_ch.clone();
|
|
result_ch.send(TcpWriteSuccess);
|
|
} else {
|
|
let stream_handle_ptr = uv::ll::get_stream_handle_from_write_req(
|
|
write_req);
|
|
let loop_ptr = uv::ll::get_loop_for_uv_handle(stream_handle_ptr);
|
|
let err_data = uv::ll::get_last_err_data(loop_ptr);
|
|
debug!("failure to write");
|
|
let result_ch = (*write_data_ptr).result_ch.clone();
|
|
result_ch.send(TcpWriteError(err_data.to_tcp_err()));
|
|
}
|
|
}
|
|
}
|
|
|
|
struct WriteReqData {
|
|
result_ch: SharedChan<TcpWriteResult>,
|
|
}
|
|
|
|
struct ConnectReqData {
|
|
result_ch: SharedChan<ConnAttempt>,
|
|
closed_signal_ch: SharedChan<()>,
|
|
}
|
|
|
|
extern fn stream_error_close_cb(handle: *uv::ll::uv_tcp_t) {
|
|
unsafe {
|
|
let data = uv::ll::get_data_for_uv_handle(handle) as
|
|
*ConnectReqData;
|
|
let closed_signal_ch = (*data).closed_signal_ch.clone();
|
|
closed_signal_ch.send(());
|
|
debug!("exiting steam_error_close_cb for %?", handle);
|
|
}
|
|
}
|
|
|
|
extern fn tcp_connect_close_cb(handle: *uv::ll::uv_tcp_t) {
|
|
debug!("closed client tcp handle %?", handle);
|
|
}
|
|
|
|
extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t,
|
|
status: libc::c_int) {
|
|
unsafe {
|
|
let conn_data_ptr = (uv::ll::get_data_for_req(connect_req_ptr)
|
|
as *ConnectReqData);
|
|
let result_ch = (*conn_data_ptr).result_ch.clone();
|
|
debug!("tcp_connect result_ch %?", result_ch);
|
|
let tcp_stream_ptr =
|
|
uv::ll::get_stream_handle_from_connect_req(connect_req_ptr);
|
|
match status {
|
|
0i32 => {
|
|
debug!("successful tcp connection!");
|
|
result_ch.send(ConnSuccess);
|
|
}
|
|
_ => {
|
|
debug!("error in tcp_connect_on_connect_cb");
|
|
let loop_ptr = uv::ll::get_loop_for_uv_handle(tcp_stream_ptr);
|
|
let err_data = uv::ll::get_last_err_data(loop_ptr);
|
|
debug!("err_data %? %?", err_data.err_name,
|
|
err_data.err_msg);
|
|
result_ch.send(ConnFailure(err_data));
|
|
uv::ll::set_data_for_uv_handle(tcp_stream_ptr,
|
|
conn_data_ptr);
|
|
uv::ll::close(tcp_stream_ptr, stream_error_close_cb);
|
|
}
|
|
}
|
|
debug!("leaving tcp_connect_on_connect_cb");
|
|
}
|
|
}
|
|
|
|
enum ConnAttempt {
|
|
ConnSuccess,
|
|
ConnFailure(uv::ll::uv_err_data)
|
|
}
|
|
|
|
struct TcpSocketData {
|
|
reader_po: @Port<result::Result<~[u8], TcpErrData>>,
|
|
reader_ch: SharedChan<result::Result<~[u8], TcpErrData>>,
|
|
stream_handle_ptr: *uv::ll::uv_tcp_t,
|
|
connect_req: uv::ll::uv_connect_t,
|
|
write_req: uv::ll::uv_write_t,
|
|
ipv6: bool,
|
|
iotask: IoTask,
|
|
}
|
|
|
|
struct TcpBufferedSocketData {
|
|
sock: TcpSocket,
|
|
buf: ~[u8],
|
|
buf_off: uint
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod test {
|
|
use core::prelude::*;
|
|
|
|
use net::ip;
|
|
use net::tcp::{GenericListenErr, TcpConnectErrData, TcpListenErrData};
|
|
use net::tcp::{connect, accept, read, listen, TcpSocket, socket_buf};
|
|
use net;
|
|
use uv::iotask::IoTask;
|
|
use uv;
|
|
|
|
use core::cell::Cell;
|
|
use core::comm::{stream, SharedChan};
|
|
use core::io;
|
|
use core::result;
|
|
use core::str;
|
|
use core::task;
|
|
|
|
// FIXME don't run on fbsd or linux 32 bit (#2064)
|
|
#[cfg(target_os="win32")]
|
|
#[cfg(target_os="darwin")]
|
|
#[cfg(target_os="linux")]
|
|
#[cfg(target_os="android")]
|
|
mod tcp_ipv4_server_and_client_test {
|
|
#[cfg(target_arch="x86_64")]
|
|
mod impl64 {
|
|
use net::tcp::test::*;
|
|
|
|
#[test]
|
|
fn test_gl_tcp_server_and_client_ipv4() {
|
|
unsafe {
|
|
impl_gl_tcp_ipv4_server_and_client();
|
|
}
|
|
}
|
|
#[test]
|
|
fn test_gl_tcp_get_peer_addr() {
|
|
unsafe {
|
|
impl_gl_tcp_ipv4_get_peer_addr();
|
|
}
|
|
}
|
|
#[test]
|
|
fn test_gl_tcp_ipv4_client_error_connection_refused() {
|
|
unsafe {
|
|
impl_gl_tcp_ipv4_client_error_connection_refused();
|
|
}
|
|
}
|
|
#[test]
|
|
fn test_gl_tcp_server_address_in_use() {
|
|
unsafe {
|
|
impl_gl_tcp_ipv4_server_address_in_use();
|
|
}
|
|
}
|
|
#[test]
|
|
fn test_gl_tcp_server_access_denied() {
|
|
unsafe {
|
|
impl_gl_tcp_ipv4_server_access_denied();
|
|
}
|
|
}
|
|
// Strange failure on Windows. --pcwalton
|
|
#[test]
|
|
#[ignore(cfg(target_os = "win32"))]
|
|
fn test_gl_tcp_ipv4_server_client_reader_writer() {
|
|
impl_gl_tcp_ipv4_server_client_reader_writer();
|
|
}
|
|
#[test]
|
|
fn test_tcp_socket_impl_reader_handles_eof() {
|
|
impl_tcp_socket_impl_reader_handles_eof();
|
|
}
|
|
}
|
|
#[cfg(target_arch="x86")]
|
|
#[cfg(target_arch="arm")]
|
|
#[cfg(target_arch="mips")]
|
|
mod impl32 {
|
|
use net::tcp::test::*;
|
|
|
|
#[test]
|
|
#[ignore(cfg(target_os = "linux"))]
|
|
fn test_gl_tcp_server_and_client_ipv4() {
|
|
unsafe {
|
|
impl_gl_tcp_ipv4_server_and_client();
|
|
}
|
|
}
|
|
#[test]
|
|
#[ignore(cfg(target_os = "linux"))]
|
|
fn test_gl_tcp_get_peer_addr() {
|
|
unsafe {
|
|
impl_gl_tcp_ipv4_get_peer_addr();
|
|
}
|
|
}
|
|
#[test]
|
|
#[ignore(cfg(target_os = "linux"))]
|
|
fn test_gl_tcp_ipv4_client_error_connection_refused() {
|
|
unsafe {
|
|
impl_gl_tcp_ipv4_client_error_connection_refused();
|
|
}
|
|
}
|
|
#[test]
|
|
#[ignore(cfg(target_os = "linux"))]
|
|
fn test_gl_tcp_server_address_in_use() {
|
|
unsafe {
|
|
impl_gl_tcp_ipv4_server_address_in_use();
|
|
}
|
|
}
|
|
#[test]
|
|
#[ignore(cfg(target_os = "linux"))]
|
|
#[ignore(cfg(windows), reason = "deadlocking bots")]
|
|
fn test_gl_tcp_server_access_denied() {
|
|
unsafe {
|
|
impl_gl_tcp_ipv4_server_access_denied();
|
|
}
|
|
}
|
|
#[test]
|
|
#[ignore(cfg(target_os = "linux"))]
|
|
#[ignore(cfg(target_os = "win32"))]
|
|
fn test_gl_tcp_ipv4_server_client_reader_writer() {
|
|
impl_gl_tcp_ipv4_server_client_reader_writer();
|
|
}
|
|
}
|
|
}
|
|
pub fn impl_gl_tcp_ipv4_server_and_client() {
|
|
let hl_loop = &uv::global_loop::get();
|
|
let server_ip = "127.0.0.1";
|
|
let server_port = 8888u;
|
|
let expected_req = ~"ping";
|
|
let expected_resp = "pong";
|
|
|
|
let (server_result_po, server_result_ch) = stream::<~str>();
|
|
|
|
let (cont_po, cont_ch) = stream::<()>();
|
|
let cont_ch = SharedChan::new(cont_ch);
|
|
// server
|
|
let hl_loop_clone = hl_loop.clone();
|
|
do task::spawn_sched(task::ManualThreads(1u)) {
|
|
let cont_ch = cont_ch.clone();
|
|
let actual_req = run_tcp_test_server(
|
|
server_ip,
|
|
server_port,
|
|
expected_resp.to_str(),
|
|
cont_ch.clone(),
|
|
&hl_loop_clone);
|
|
server_result_ch.send(actual_req);
|
|
};
|
|
cont_po.recv();
|
|
// client
|
|
debug!("server started, firing up client..");
|
|
let actual_resp_result = run_tcp_test_client(
|
|
server_ip,
|
|
server_port,
|
|
expected_req,
|
|
hl_loop);
|
|
assert!(actual_resp_result.is_ok());
|
|
let actual_resp = actual_resp_result.get();
|
|
let actual_req = server_result_po.recv();
|
|
debug!("REQ: expected: '%s' actual: '%s'",
|
|
expected_req, actual_req);
|
|
debug!("RESP: expected: '%s' actual: '%s'",
|
|
expected_resp, actual_resp);
|
|
assert!(str::contains(actual_req, expected_req));
|
|
assert!(str::contains(actual_resp, expected_resp));
|
|
}
|
|
pub fn impl_gl_tcp_ipv4_get_peer_addr() {
|
|
let hl_loop = &uv::global_loop::get();
|
|
let server_ip = "127.0.0.1";
|
|
let server_port = 8887u;
|
|
let expected_resp = "pong";
|
|
|
|
let (cont_po, cont_ch) = stream::<()>();
|
|
let cont_ch = SharedChan::new(cont_ch);
|
|
// server
|
|
let hl_loop_clone = hl_loop.clone();
|
|
do task::spawn_sched(task::ManualThreads(1u)) {
|
|
let cont_ch = cont_ch.clone();
|
|
run_tcp_test_server(
|
|
server_ip,
|
|
server_port,
|
|
expected_resp.to_str(),
|
|
cont_ch.clone(),
|
|
&hl_loop_clone);
|
|
};
|
|
cont_po.recv();
|
|
// client
|
|
debug!("server started, firing up client..");
|
|
let server_ip_addr = ip::v4::parse_addr(server_ip);
|
|
let iotask = uv::global_loop::get();
|
|
let connect_result = connect(server_ip_addr, server_port,
|
|
&iotask);
|
|
|
|
let sock = result::unwrap(connect_result);
|
|
|
|
debug!("testing peer address");
|
|
// This is what we are actually testing!
|
|
assert!(net::ip::format_addr(&sock.get_peer_addr()) ==
|
|
~"127.0.0.1");
|
|
assert_eq!(net::ip::get_port(&sock.get_peer_addr()), 8887);
|
|
|
|
// Fulfill the protocol the test server expects
|
|
let resp_bytes = str::to_bytes("ping");
|
|
tcp_write_single(&sock, resp_bytes);
|
|
debug!("message sent");
|
|
sock.read(0u);
|
|
debug!("result read");
|
|
}
|
|
pub fn impl_gl_tcp_ipv4_client_error_connection_refused() {
|
|
let hl_loop = &uv::global_loop::get();
|
|
let server_ip = "127.0.0.1";
|
|
let server_port = 8889u;
|
|
let expected_req = ~"ping";
|
|
// client
|
|
debug!("firing up client..");
|
|
let actual_resp_result = run_tcp_test_client(
|
|
server_ip,
|
|
server_port,
|
|
expected_req,
|
|
hl_loop);
|
|
match actual_resp_result.get_err() {
|
|
ConnectionRefused => (),
|
|
_ => fail!("unknown error.. expected connection_refused")
|
|
}
|
|
}
|
|
pub fn impl_gl_tcp_ipv4_server_address_in_use() {
|
|
let hl_loop = &uv::global_loop::get();
|
|
let server_ip = "127.0.0.1";
|
|
let server_port = 8890u;
|
|
let expected_req = ~"ping";
|
|
let expected_resp = "pong";
|
|
|
|
let (cont_po, cont_ch) = stream::<()>();
|
|
let cont_ch = SharedChan::new(cont_ch);
|
|
// server
|
|
let hl_loop_clone = hl_loop.clone();
|
|
do task::spawn_sched(task::ManualThreads(1u)) {
|
|
let cont_ch = cont_ch.clone();
|
|
run_tcp_test_server(
|
|
server_ip,
|
|
server_port,
|
|
expected_resp.to_str(),
|
|
cont_ch.clone(),
|
|
&hl_loop_clone);
|
|
}
|
|
cont_po.recv();
|
|
// this one should fail..
|
|
let listen_err = run_tcp_test_server_fail(
|
|
server_ip,
|
|
server_port,
|
|
hl_loop);
|
|
// client.. just doing this so that the first server tears down
|
|
debug!("server started, firing up client..");
|
|
run_tcp_test_client(
|
|
server_ip,
|
|
server_port,
|
|
expected_req,
|
|
hl_loop);
|
|
match listen_err {
|
|
AddressInUse => {
|
|
assert!(true);
|
|
}
|
|
_ => {
|
|
fail!("expected address_in_use listen error, \
|
|
but got a different error varient. check logs.");
|
|
}
|
|
}
|
|
}
|
|
pub fn impl_gl_tcp_ipv4_server_access_denied() {
|
|
let hl_loop = &uv::global_loop::get();
|
|
let server_ip = "127.0.0.1";
|
|
let server_port = 80u;
|
|
// this one should fail..
|
|
let listen_err = run_tcp_test_server_fail(
|
|
server_ip,
|
|
server_port,
|
|
hl_loop);
|
|
match listen_err {
|
|
AccessDenied => {
|
|
assert!(true);
|
|
}
|
|
_ => {
|
|
fail!("expected address_in_use listen error, \
|
|
but got a different error varient. check logs.");
|
|
}
|
|
}
|
|
}
|
|
pub fn impl_gl_tcp_ipv4_server_client_reader_writer() {
|
|
|
|
let iotask = &uv::global_loop::get();
|
|
let server_ip = "127.0.0.1";
|
|
let server_port = 8891u;
|
|
let expected_req = ~"ping";
|
|
let expected_resp = "pong";
|
|
|
|
let (server_result_po, server_result_ch) = stream::<~str>();
|
|
|
|
let (cont_po, cont_ch) = stream::<()>();
|
|
let cont_ch = SharedChan::new(cont_ch);
|
|
// server
|
|
let iotask_clone = iotask.clone();
|
|
do task::spawn_sched(task::ManualThreads(1u)) {
|
|
let cont_ch = cont_ch.clone();
|
|
let actual_req = run_tcp_test_server(
|
|
server_ip,
|
|
server_port,
|
|
expected_resp.to_str(),
|
|
cont_ch.clone(),
|
|
&iotask_clone);
|
|
server_result_ch.send(actual_req);
|
|
};
|
|
cont_po.recv();
|
|
// client
|
|
let server_addr = ip::v4::parse_addr(server_ip);
|
|
let conn_result = connect(server_addr, server_port, iotask);
|
|
if result::is_err(&conn_result) {
|
|
assert!(false);
|
|
}
|
|
let sock_buf = @socket_buf(result::unwrap(conn_result));
|
|
buf_write(sock_buf, expected_req);
|
|
|
|
// so contrived!
|
|
let actual_resp = do str::as_bytes(&expected_resp.to_str()) |resp_buf| {
|
|
buf_read(sock_buf, resp_buf.len())
|
|
};
|
|
|
|
let actual_req = server_result_po.recv();
|
|
debug!("REQ: expected: '%s' actual: '%s'",
|
|
expected_req, actual_req);
|
|
debug!("RESP: expected: '%s' actual: '%s'",
|
|
expected_resp, actual_resp);
|
|
assert!(str::contains(actual_req, expected_req));
|
|
assert!(str::contains(actual_resp, expected_resp));
|
|
}
|
|
|
|
pub fn impl_tcp_socket_impl_reader_handles_eof() {
|
|
use core::io::{Reader,ReaderUtil};
|
|
|
|
let hl_loop = &uv::global_loop::get();
|
|
let server_ip = "127.0.0.1";
|
|
let server_port = 10041u;
|
|
let expected_req = ~"GET /";
|
|
let expected_resp = "A string\nwith multiple lines\n";
|
|
|
|
let (cont_po, cont_ch) = stream::<()>();
|
|
let cont_ch = SharedChan::new(cont_ch);
|
|
// server
|
|
let hl_loop_clone = hl_loop.clone();
|
|
do task::spawn_sched(task::ManualThreads(1u)) {
|
|
let cont_ch = cont_ch.clone();
|
|
run_tcp_test_server(
|
|
server_ip,
|
|
server_port,
|
|
expected_resp.to_str(),
|
|
cont_ch.clone(),
|
|
&hl_loop_clone);
|
|
};
|
|
cont_po.recv();
|
|
// client
|
|
debug!("server started, firing up client..");
|
|
let server_addr = ip::v4::parse_addr(server_ip);
|
|
let conn_result = connect(server_addr, server_port, hl_loop);
|
|
if result::is_err(&conn_result) {
|
|
assert!(false);
|
|
}
|
|
let sock_buf = @socket_buf(result::unwrap(conn_result));
|
|
buf_write(sock_buf, expected_req);
|
|
|
|
let buf_reader = sock_buf as @Reader;
|
|
let actual_response = str::from_bytes(buf_reader.read_whole_stream());
|
|
debug!("Actual response: %s", actual_response);
|
|
assert!(expected_resp == actual_response);
|
|
}
|
|
|
|
fn buf_write<W:io::Writer>(w: &W, val: &str) {
|
|
debug!("BUF_WRITE: val len %?", str::len(val));
|
|
do str::byte_slice(val) |b_slice| {
|
|
debug!("BUF_WRITE: b_slice len %?",
|
|
b_slice.len());
|
|
w.write(b_slice)
|
|
}
|
|
}
|
|
|
|
fn buf_read<R:io::Reader>(r: &R, len: uint) -> ~str {
|
|
let new_bytes = (*r).read_bytes(len);
|
|
debug!("in buf_read.. new_bytes len: %?",
|
|
new_bytes.len());
|
|
str::from_bytes(new_bytes)
|
|
}
|
|
|
|
fn run_tcp_test_server(server_ip: &str, server_port: uint, resp: ~str,
|
|
cont_ch: SharedChan<()>,
|
|
iotask: &IoTask) -> ~str {
|
|
let (server_po, server_ch) = stream::<~str>();
|
|
let server_ch = SharedChan::new(server_ch);
|
|
let server_ip_addr = ip::v4::parse_addr(server_ip);
|
|
let resp_cell = Cell::new(resp);
|
|
let listen_result = listen(server_ip_addr, server_port, 128,
|
|
iotask,
|
|
// on_establish_cb -- called when listener is set up
|
|
|kill_ch| {
|
|
debug!("establish_cb %?",
|
|
kill_ch);
|
|
cont_ch.send(());
|
|
},
|
|
// risky to run this on the loop, but some users
|
|
// will want the POWER
|
|
|new_conn, kill_ch| {
|
|
let resp_cell2 = Cell::new(resp_cell.take());
|
|
debug!("SERVER: new connection!");
|
|
let (cont_po, cont_ch) = stream();
|
|
let server_ch = server_ch.clone();
|
|
do task::spawn_sched(task::ManualThreads(1u)) {
|
|
debug!("SERVER: starting worker for new req");
|
|
|
|
let accept_result = accept(new_conn);
|
|
debug!("SERVER: after accept()");
|
|
if result::is_err(&accept_result) {
|
|
debug!("SERVER: error accept connection");
|
|
let err_data = result::get_err(&accept_result);
|
|
kill_ch.send(Some(err_data));
|
|
debug!(
|
|
"SERVER/WORKER: send on err cont ch");
|
|
cont_ch.send(());
|
|
}
|
|
else {
|
|
debug!("SERVER/WORKER: send on cont ch");
|
|
cont_ch.send(());
|
|
let sock = result::unwrap(accept_result);
|
|
let peer_addr = sock.get_peer_addr();
|
|
debug!("SERVER: successfully accepted \
|
|
connection from %s:%u",
|
|
ip::format_addr(&peer_addr),
|
|
ip::get_port(&peer_addr));
|
|
let received_req_bytes = read(&sock, 0u);
|
|
match received_req_bytes {
|
|
result::Ok(data) => {
|
|
debug!("SERVER: got REQ str::from_bytes..");
|
|
debug!("SERVER: REQ data len: %?",
|
|
data.len());
|
|
server_ch.send(
|
|
str::from_bytes(data));
|
|
debug!("SERVER: before write");
|
|
tcp_write_single(&sock, str::to_bytes(resp_cell2.take()));
|
|
debug!("SERVER: after write.. die");
|
|
kill_ch.send(None);
|
|
}
|
|
result::Err(err_data) => {
|
|
debug!("SERVER: error recvd: %s %s",
|
|
err_data.err_name, err_data.err_msg);
|
|
kill_ch.send(Some(err_data));
|
|
server_ch.send(~"");
|
|
}
|
|
}
|
|
debug!("SERVER: worker spinning down");
|
|
}
|
|
}
|
|
debug!("SERVER: waiting to recv on cont_ch");
|
|
cont_po.recv();
|
|
});
|
|
// err check on listen_result
|
|
if result::is_err(&listen_result) {
|
|
match result::get_err(&listen_result) {
|
|
GenericListenErr(ref name, ref msg) => {
|
|
fail!("SERVER: exited abnormally name %s msg %s", *name, *msg);
|
|
}
|
|
AccessDenied => {
|
|
fail!("SERVER: exited abnormally, got access denied..");
|
|
}
|
|
AddressInUse => {
|
|
fail!("SERVER: exited abnormally, got address in use...");
|
|
}
|
|
}
|
|
}
|
|
let ret_val = server_po.recv();
|
|
debug!("SERVER: exited and got return val: '%s'", ret_val);
|
|
ret_val
|
|
}
|
|
|
|
fn run_tcp_test_server_fail(server_ip: &str, server_port: uint,
|
|
iotask: &IoTask) -> TcpListenErrData {
|
|
let server_ip_addr = ip::v4::parse_addr(server_ip);
|
|
let listen_result = listen(server_ip_addr, server_port, 128,
|
|
iotask,
|
|
// on_establish_cb -- called when listener is set up
|
|
|kill_ch| {
|
|
debug!("establish_cb %?", kill_ch);
|
|
},
|
|
|new_conn, kill_ch| {
|
|
fail!("SERVER: shouldn't be called.. %? %?", new_conn, kill_ch);
|
|
});
|
|
// err check on listen_result
|
|
if result::is_err(&listen_result) {
|
|
result::get_err(&listen_result)
|
|
}
|
|
else {
|
|
fail!("SERVER: did not fail as expected")
|
|
}
|
|
}
|
|
|
|
fn run_tcp_test_client(server_ip: &str, server_port: uint, resp: &str,
|
|
iotask: &IoTask) -> result::Result<~str,
|
|
TcpConnectErrData> {
|
|
let server_ip_addr = ip::v4::parse_addr(server_ip);
|
|
|
|
debug!("CLIENT: starting..");
|
|
let connect_result = connect(server_ip_addr, server_port,
|
|
iotask);
|
|
if result::is_err(&connect_result) {
|
|
debug!("CLIENT: failed to connect");
|
|
let err_data = result::get_err(&connect_result);
|
|
Err(err_data)
|
|
}
|
|
else {
|
|
let sock = result::unwrap(connect_result);
|
|
let resp_bytes = str::to_bytes(resp);
|
|
tcp_write_single(&sock, resp_bytes);
|
|
let read_result = sock.read(0u);
|
|
if read_result.is_err() {
|
|
debug!("CLIENT: failure to read");
|
|
Ok(~"")
|
|
}
|
|
else {
|
|
let ret_val = str::from_bytes(read_result.get());
|
|
debug!("CLIENT: after client_ch recv ret: '%s'",
|
|
ret_val);
|
|
Ok(ret_val)
|
|
}
|
|
}
|
|
}
|
|
|
|
fn tcp_write_single(sock: &TcpSocket, val: ~[u8]) {
|
|
let mut write_result_future = sock.write_future(val);
|
|
let write_result = write_result_future.get();
|
|
if result::is_err(&write_result) {
|
|
debug!("tcp_write_single: write failed!");
|
|
let err_data = result::get_err(&write_result);
|
|
debug!("tcp_write_single err name: %s msg: %s",
|
|
err_data.err_name, err_data.err_msg);
|
|
// meh. torn on what to do here.
|
|
fail!("tcp_write_single failed");
|
|
}
|
|
}
|
|
}
|