std: splitting out tcp server API + tests
- we now have two interfaces for the TCP/IP server/listener workflow, based on different user approaches surrounding how to deal with the flow of accept a new tcp connection: 1. the "original" API closely mimics the low-level libuv API, in that we have an on_connect_cb that the user provides *that is ran on the libuv thread*. In this callback, the user can accept() a connection, turning it into a tcp_socket.. of course, before accepting, they have the option of passing it to a new task, provided they *make the cb block until the accept is done* .. this is because, in libuv, you have to do the uv_accept call in the span of that on_connect_cb callback that gets fired when a new connection comes in. thems the breaks.. I wanted to just get rid of this API, because the general proposition of users always running code on the libuv thread sounds like an invitation for many future headaches. the API restriction to have to choose to immediately accept a connection (and allow the user to block libuv as needed) isn't too bad for power users who could conceive of circumstances where they would drop an incoming TCP connection and know what they're doing, in general. but as a general API, I thought this was a bit cumbersome, so I ended up devising.. 2. an API that is initiated with a call to `net::tcp::new_listener()` .. has a similar signature to `net::tcp::listen()`, except that is just returns an object that sort of behaves like a `comm::port`. Users can block on the `tcp_conn_port` to receive new connections, either in the current task or in a new task, depending on which API route they take (`net::tcp::conn_recv` or `net::tcp::conn_recv_spawn` respectively).. there is also a `net::tcp::conn_peek` function that will do a peek on the underlying port to see if there are pending connections. The main difference, with this API, is that the low-level libuv glue is going to *accept every connection attempt*, along with the overhead that that brings. But, this is a much more hassle-free API for 95% of use cases and will probably be the one that most users will want to reach for.
This commit is contained in:
parent
e9c6416df6
commit
6c6a47bf22
@ -12,33 +12,33 @@ export tcp_socket, tcp_conn_port, tcp_err_data;
|
||||
// operations on a tcp_socket
|
||||
export write, read_start, read_stop;
|
||||
// tcp server stuff
|
||||
export new_listener, listen_for_conn, accept;
|
||||
export new_listener, listen_for_conn, accept, conn_recv;
|
||||
// tcp client stuff
|
||||
export connect;
|
||||
// misc util
|
||||
export is_responding;
|
||||
|
||||
#[nolink]
|
||||
native mod rustrt {
|
||||
fn rust_uv_current_kernel_malloc(size: libc::size_t) -> *libc::c_void;
|
||||
fn rust_uv_current_kernel_malloc(size: libc::c_uint) -> *libc::c_void;
|
||||
fn rust_uv_current_kernel_free(mem: *libc::c_void);
|
||||
fn rust_uv_helper_uv_tcp_t_size() -> libc::c_uint;
|
||||
}
|
||||
|
||||
#[doc="
|
||||
Encapsulates an open TCP/IP connection through libuv
|
||||
|
||||
`tcp_socket` non-sendable and handles automatically closing the underlying libuv data structures when it goes out of scope.
|
||||
`tcp_socket` is non-copyable/sendable and automagically handles closing the
|
||||
underlying libuv data structures when it goes out of scope. This is the
|
||||
data structure that is used for read/write operations over a TCP stream.
|
||||
"]
|
||||
resource tcp_socket(socket_data_wrap: @{data:*mut tcp_socket_data})
|
||||
resource tcp_socket(socket_data: @tcp_socket_data)
|
||||
unsafe {
|
||||
let closed_po = comm::port::<()>();
|
||||
let closed_ch = comm::chan(closed_po);
|
||||
let close_data = {
|
||||
closed_ch: closed_ch
|
||||
};
|
||||
let socket_data = (*socket_data_wrap).data;
|
||||
let close_data_ptr = ptr::addr_of(close_data);
|
||||
let stream_handle_ptr = ptr::addr_of((*socket_data).stream_handle);
|
||||
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
|
||||
uv::hl::interact((*socket_data).hl_loop) {|loop_ptr|
|
||||
log(debug, #fmt("interact dtor for tcp_socket stream %? loop %?",
|
||||
stream_handle_ptr, loop_ptr));
|
||||
@ -48,7 +48,8 @@ resource tcp_socket(socket_data_wrap: @{data:*mut tcp_socket_data})
|
||||
};
|
||||
comm::recv(closed_po);
|
||||
log(debug, #fmt("about to free socket_data at %?", socket_data));
|
||||
rustrt::rust_uv_current_kernel_free(socket_data as *libc::c_void);
|
||||
rustrt::rust_uv_current_kernel_free(stream_handle_ptr
|
||||
as *libc::c_void);
|
||||
log(debug, "exiting dtor for tcp_socket");
|
||||
}
|
||||
|
||||
@ -98,24 +99,24 @@ fn connect(input_ip: ip::ip_addr, port: uint)
|
||||
let conn_data_ptr = ptr::addr_of(conn_data);
|
||||
let hl_loop = uv::global_loop::get();
|
||||
let reader_po = comm::port::<result::result<[u8], tcp_err_data>>();
|
||||
let socket_data_ptr = new_socket_data();
|
||||
*socket_data_ptr = {
|
||||
let stream_handle_ptr = malloc_uv_tcp_t();
|
||||
*(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
|
||||
let socket_data = @{
|
||||
reader_po: reader_po,
|
||||
reader_ch: comm::chan(reader_po),
|
||||
stream_handle : uv::ll::tcp_t(),
|
||||
stream_handle_ptr : stream_handle_ptr,
|
||||
connect_req : uv::ll::connect_t(),
|
||||
write_req : uv::ll::write_t(),
|
||||
hl_loop: hl_loop
|
||||
};
|
||||
let socket_data_ptr = ptr::addr_of(*socket_data);
|
||||
log(debug, #fmt("tcp_connect result_ch %?", conn_data.result_ch));
|
||||
// get an unsafe representation of our stream_handle_ptr that
|
||||
// we can send into the interact cb to be handled in libuv..
|
||||
log(debug, #fmt("stream_handl_ptr outside interact %?",
|
||||
ptr::addr_of((*socket_data_ptr).stream_handle)));
|
||||
stream_handle_ptr));
|
||||
uv::hl::interact(hl_loop) {|loop_ptr|
|
||||
log(debug, "in interact cb for tcp client connect..");
|
||||
let stream_handle_ptr =
|
||||
ptr::addr_of((*socket_data_ptr).stream_handle);
|
||||
log(debug, #fmt("stream_handl_ptr in interact %?",
|
||||
stream_handle_ptr));
|
||||
alt uv::ll::tcp_init( loop_ptr, stream_handle_ptr) {
|
||||
@ -174,7 +175,7 @@ fn connect(input_ip: ip::ip_addr, port: uint)
|
||||
alt comm::recv(result_po) {
|
||||
conn_success {
|
||||
log(debug, "tcp::connect - received success on result_po");
|
||||
result::ok(tcp_socket(@{data:socket_data_ptr}))
|
||||
result::ok(tcp_socket(socket_data))
|
||||
}
|
||||
conn_failure(err_data) {
|
||||
comm::recv(closed_signal_po);
|
||||
@ -200,10 +201,10 @@ A `result` object with a `()` value, in the event of success, or a
|
||||
"]
|
||||
fn write(sock: tcp_socket, raw_write_data: [[u8]])
|
||||
-> result::result<(), tcp_err_data> unsafe {
|
||||
let socket_data_ptr = ((**sock).data);
|
||||
let socket_data_ptr = ptr::addr_of(**sock);
|
||||
let write_req_ptr = ptr::addr_of((*socket_data_ptr).write_req);
|
||||
let stream_handle_ptr =
|
||||
ptr::addr_of((*socket_data_ptr).stream_handle);
|
||||
(*socket_data_ptr).stream_handle_ptr;
|
||||
let write_buf_vec = iter::map_to_vec(raw_write_data) {|raw_bytes|
|
||||
uv::ll::buf_init(vec::unsafe::to_ptr(raw_bytes),
|
||||
vec::len(raw_bytes))
|
||||
@ -254,8 +255,8 @@ on) from until `read_stop` is called, or a `tcp_err_data` record
|
||||
fn read_start(sock: tcp_socket)
|
||||
-> result::result<comm::port<
|
||||
result::result<[u8], tcp_err_data>>, tcp_err_data> unsafe {
|
||||
let socket_data = (**sock).data;
|
||||
let stream_handle_ptr = ptr::addr_of((*socket_data).stream_handle);
|
||||
let socket_data = ptr::addr_of(**sock);
|
||||
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
|
||||
let start_po = comm::port::<option<uv::ll::uv_err_data>>();
|
||||
let start_ch = comm::chan(start_po);
|
||||
log(debug, "in tcp::read_start before interact loop");
|
||||
@ -290,8 +291,8 @@ Stop reading from an open TCP connection.
|
||||
"]
|
||||
fn read_stop(sock: tcp_socket) ->
|
||||
result::result<(), tcp_err_data> unsafe {
|
||||
let socket_data = (**sock).data;
|
||||
let stream_handle_ptr = ptr::addr_of((*socket_data).stream_handle);
|
||||
let socket_data = ptr::addr_of(**sock);
|
||||
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
|
||||
let stop_po = comm::port::<option<tcp_err_data>>();
|
||||
let stop_ch = comm::chan(stop_po);
|
||||
uv::hl::interact((*socket_data).hl_loop) {|loop_ptr|
|
||||
@ -406,6 +407,58 @@ fn new_listener(host_ip: ip::ip_addr, port: uint, backlog: uint)
|
||||
}
|
||||
}
|
||||
|
||||
#[doc="
|
||||
Block on a `net::tcp::tcp_conn_port` until a new connection arrives
|
||||
|
||||
# Arguments
|
||||
|
||||
* server_port -- a `net::tcp::tcp_conn_port` that you wish to listen
|
||||
on for an incoming connection
|
||||
|
||||
# Returns
|
||||
|
||||
A `result` object containing a `net::tcp::tcp_socket`, ready for immediate
|
||||
use, as the `ok` varient, or a `net::tcp::tcp_err_data` for the `err`
|
||||
variant
|
||||
"]
|
||||
fn conn_recv(server_port: tcp_conn_port)
|
||||
-> result::result<tcp_socket, tcp_err_data> {
|
||||
let new_conn_po = (**server_port).new_conn_po;
|
||||
let hl_loop = (**server_port).hl_loop;
|
||||
let new_conn_result = comm::recv(new_conn_po);
|
||||
alt new_conn_result {
|
||||
ok(client_stream_ptr) {
|
||||
conn_port_new_tcp_socket(client_stream_ptr, hl_loop)
|
||||
}
|
||||
err(err_data) {
|
||||
result::err(err_data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn conn_recv_spawn(server_port: tcp_conn_port,
|
||||
cb: fn~(result::result<tcp_socket, tcp_err_data>)) {
|
||||
let new_conn_po = (**server_port).new_conn_po;
|
||||
let hl_loop = (**server_port).hl_loop;
|
||||
let new_conn_result = comm::recv(new_conn_po);
|
||||
task::spawn {||
|
||||
let sock_create_result = alt new_conn_result {
|
||||
ok(client_stream_ptr) {
|
||||
conn_port_new_tcp_socket(client_stream_ptr, hl_loop)
|
||||
}
|
||||
err(err_data) {
|
||||
result::err(err_data)
|
||||
}
|
||||
};
|
||||
cb(sock_create_result);
|
||||
};
|
||||
}
|
||||
|
||||
fn conn_peek(server_port: tcp_conn_port) -> bool {
|
||||
let new_conn_po = (**server_port).new_conn_po;
|
||||
comm::peek(new_conn_po)
|
||||
}
|
||||
|
||||
#[doc="
|
||||
Bind an incoming client connection to a `net::tcp::tcp_socket`
|
||||
|
||||
@ -477,17 +530,19 @@ fn accept(new_conn: tcp_new_connection)
|
||||
server_handle_ptr) as *tcp_listen_fc_data;
|
||||
let reader_po = comm::port::<result::result<[u8], tcp_err_data>>();
|
||||
let hl_loop = (*server_data_ptr).hl_loop;
|
||||
let client_socket_data_ptr = new_socket_data();
|
||||
*client_socket_data_ptr = {
|
||||
let stream_handle_ptr = malloc_uv_tcp_t();
|
||||
*(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
|
||||
let client_socket_data = @{
|
||||
reader_po: reader_po,
|
||||
reader_ch: comm::chan(reader_po),
|
||||
stream_handle : uv::ll::tcp_t(),
|
||||
stream_handle_ptr : stream_handle_ptr,
|
||||
connect_req : uv::ll::connect_t(),
|
||||
write_req : uv::ll::write_t(),
|
||||
hl_loop: hl_loop
|
||||
};
|
||||
let client_stream_handle_ptr = ptr::addr_of(
|
||||
(*client_socket_data_ptr).stream_handle);
|
||||
let client_socket_data_ptr = ptr::addr_of(*client_socket_data);
|
||||
let client_stream_handle_ptr =
|
||||
(*client_socket_data_ptr).stream_handle_ptr;
|
||||
|
||||
let result_po = comm::port::<option<tcp_err_data>>();
|
||||
let result_ch = comm::chan(result_po);
|
||||
@ -533,59 +588,13 @@ fn accept(new_conn: tcp_new_connection)
|
||||
result::err(err_data)
|
||||
}
|
||||
none {
|
||||
result::ok(tcp_socket(@{data: client_socket_data_ptr }))
|
||||
result::ok(tcp_socket(client_socket_data))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[doc="
|
||||
Attempt to open a TCP/IP connection on a remote host
|
||||
|
||||
The connection will (attempt to) be successfully established and then
|
||||
disconnect immediately. It is useful to determine, simply, if a remote
|
||||
host is responding, and that is all.
|
||||
|
||||
# Arguments
|
||||
|
||||
* `remote_ip` - an IP address (versions 4 or 6) for the remote host
|
||||
* `remote_port` - a uint representing the port on the remote host to
|
||||
connect to
|
||||
* `timeout_msecs` - a timeout period, in miliseconds, to wait before
|
||||
aborting the connection attempt
|
||||
|
||||
# Returns
|
||||
|
||||
A `bool` indicating success or failure. If a connection was established
|
||||
to the remote host in the alloted timeout, `true` is returned. If the
|
||||
host refused the connection, timed out or had some other error condition,
|
||||
`false` is returned.
|
||||
"]
|
||||
fn is_responding(remote_ip: ip::ip_addr, remote_port: uint,
|
||||
timeout_msecs: uint) -> bool {
|
||||
log(debug, "entering is_responding");
|
||||
let connected_po = comm::port::<bool>();
|
||||
let connected_ch = comm::chan(connected_po);
|
||||
task::spawn {||
|
||||
log(debug, "in is_responding nested task");
|
||||
let connect_result = connect(remote_ip, remote_port);
|
||||
let connect_succeeded = result::is_success(connect_result);
|
||||
log(debug, #fmt("leaving is_responding nested task .. result %?",
|
||||
connect_succeeded));
|
||||
comm::send(connected_ch, connect_succeeded);
|
||||
};
|
||||
log(debug, "exiting is_responding");
|
||||
alt timer::recv_timeout(timeout_msecs, connected_po) {
|
||||
some(connect_succeeded) {
|
||||
log(debug, #fmt("connect succedded? %?", connect_succeeded));
|
||||
connect_succeeded }
|
||||
none {
|
||||
log(debug, "is_responding timed out on waiting to connect");
|
||||
false }
|
||||
}
|
||||
}
|
||||
|
||||
#[doc="
|
||||
Bind to a given IP/port and listen for new connections
|
||||
|
||||
@ -609,9 +618,9 @@ callback's arguments are:
|
||||
|
||||
# returns
|
||||
|
||||
a `result` instance containing empty data of type `()` on a successful
|
||||
or normal shutdown, and a `tcp_err_data` record in the event of listen
|
||||
exiting because of an error
|
||||
a `result` instance containing empty data of type `()` on a
|
||||
successful/normal shutdown, and a `tcp_err_data` record in the event
|
||||
of listen exiting because of an error
|
||||
"]
|
||||
fn listen_for_conn(host_ip: ip::ip_addr, port: uint, backlog: uint,
|
||||
on_establish_cb: fn~(comm::chan<option<tcp_err_data>>),
|
||||
@ -704,7 +713,36 @@ fn listen_for_conn(host_ip: ip::ip_addr, port: uint, backlog: uint,
|
||||
}
|
||||
}
|
||||
|
||||
// internal api
|
||||
// INTERNAL API
|
||||
|
||||
// various recv_* can use a tcp_conn_port can re-use this..
|
||||
fn conn_port_new_tcp_socket(
|
||||
stream_handle_ptr: *uv::ll::uv_tcp_t,
|
||||
hl_loop: uv::hl::high_level_loop)
|
||||
-> result::result<tcp_socket,tcp_err_data> unsafe {
|
||||
// tcp_nl_on_connection_cb
|
||||
let reader_po = comm::port::<result::result<[u8], tcp_err_data>>();
|
||||
let client_socket_data = @{
|
||||
reader_po : reader_po,
|
||||
reader_ch : comm::chan(reader_po),
|
||||
stream_handle_ptr : stream_handle_ptr,
|
||||
connect_req : uv::ll::connect_t(),
|
||||
write_req : uv::ll::write_t(),
|
||||
hl_loop : hl_loop
|
||||
};
|
||||
let client_socket_data_ptr = ptr::addr_of(*client_socket_data);
|
||||
comm::listen {|cont_ch|
|
||||
uv::hl::interact(hl_loop) {|loop_ptr|
|
||||
log(debug, #fmt("in interact cb 4 conn_port_new_tcp.. loop %?",
|
||||
loop_ptr));
|
||||
uv::ll::set_data_for_uv_handle(stream_handle_ptr,
|
||||
client_socket_data_ptr);
|
||||
cont_ch.send(());
|
||||
};
|
||||
cont_ch.recv()
|
||||
};
|
||||
result::ok(tcp_socket(client_socket_data))
|
||||
}
|
||||
|
||||
enum tcp_new_connection {
|
||||
new_tcp_conn(*uv::ll::uv_tcp_t)
|
||||
@ -716,7 +754,7 @@ type tcp_conn_port_data = {
|
||||
stream_closed_ch: comm::chan<()>,
|
||||
hl_loop: uv::hl::high_level_loop,
|
||||
new_conn_po: comm::port<result::result<*uv::ll::uv_tcp_t,
|
||||
tcp_err_data>>,
|
||||
tcp_err_data>>,
|
||||
new_conn_ch: comm::chan<result::result<*uv::ll::uv_tcp_t,
|
||||
tcp_err_data>>
|
||||
};
|
||||
@ -769,10 +807,9 @@ crust fn tcp_nl_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
|
||||
comm::send((*conn_data_ptr).stream_closed_ch, ());
|
||||
}
|
||||
|
||||
fn new_socket_data() -> *mut tcp_socket_data unsafe {
|
||||
fn malloc_uv_tcp_t() -> *uv::ll::uv_tcp_t unsafe {
|
||||
rustrt::rust_uv_current_kernel_malloc(
|
||||
sys::size_of::<tcp_socket_data>()) as
|
||||
*mut tcp_socket_data
|
||||
rustrt::rust_uv_helper_uv_tcp_t_size()) as *uv::ll::uv_tcp_t
|
||||
}
|
||||
|
||||
crust fn tcp_nl_on_connection_cb(server_handle_ptr: *uv::ll::uv_tcp_t,
|
||||
@ -783,19 +820,9 @@ crust fn tcp_nl_on_connection_cb(server_handle_ptr: *uv::ll::uv_tcp_t,
|
||||
let loop_ptr = uv::ll::get_loop_for_uv_handle(server_handle_ptr);
|
||||
alt status {
|
||||
0i32 {
|
||||
let hl_loop = (*server_data_ptr).hl_loop;
|
||||
let reader_po = comm::port::<result::result<[u8], tcp_err_data>>();
|
||||
let client_socket_data_ptr = new_socket_data();
|
||||
*client_socket_data_ptr = {
|
||||
reader_po: reader_po,
|
||||
reader_ch: comm::chan(reader_po),
|
||||
stream_handle : uv::ll::tcp_t(),
|
||||
connect_req : uv::ll::connect_t(),
|
||||
write_req : uv::ll::write_t(),
|
||||
hl_loop: hl_loop
|
||||
};
|
||||
let client_stream_handle_ptr = ptr::addr_of(
|
||||
(*client_socket_data_ptr).stream_handle);
|
||||
let client_stream_handle_ptr = malloc_uv_tcp_t();
|
||||
*(client_stream_handle_ptr as *mut uv::ll::uv_tcp_t) =
|
||||
uv::ll::tcp_t();
|
||||
alt uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) {
|
||||
0i32 {
|
||||
log(debug, "uv_tcp_init successful for client stream");
|
||||
@ -804,9 +831,6 @@ crust fn tcp_nl_on_connection_cb(server_handle_ptr: *uv::ll::uv_tcp_t,
|
||||
client_stream_handle_ptr as *libc::c_void) {
|
||||
0i32 {
|
||||
log(debug, "successfully accepted client connection");
|
||||
uv::ll::set_data_for_uv_handle(client_stream_handle_ptr,
|
||||
client_socket_data_ptr as
|
||||
*libc::c_void);
|
||||
comm::send(new_conn_ch,
|
||||
result::ok(client_stream_handle_ptr));
|
||||
}
|
||||
@ -1002,7 +1026,7 @@ enum conn_attempt {
|
||||
type tcp_socket_data = {
|
||||
reader_po: comm::port<result::result<[u8], tcp_err_data>>,
|
||||
reader_ch: comm::chan<result::result<[u8], tcp_err_data>>,
|
||||
stream_handle: uv::ll::uv_tcp_t,
|
||||
stream_handle_ptr: *uv::ll::uv_tcp_t,
|
||||
connect_req: uv::ll::uv_connect_t,
|
||||
write_req: uv::ll::uv_write_t,
|
||||
hl_loop: uv::hl::high_level_loop
|
||||
@ -1035,6 +1059,10 @@ mod test {
|
||||
fn test_gl_tcp_server_and_client_ipv4() unsafe {
|
||||
impl_gl_tcp_ipv4_server_and_client();
|
||||
}
|
||||
#[test]
|
||||
fn test_gl_tcp_server_listener_and_client_ipv4() unsafe {
|
||||
impl_gl_tcp_ipv4_server_listener_and_client();
|
||||
}
|
||||
}
|
||||
#[cfg(target_arch="x86")]
|
||||
mod impl32 {
|
||||
@ -1043,6 +1071,11 @@ mod test {
|
||||
fn test_gl_tcp_server_and_client_ipv4() unsafe {
|
||||
impl_gl_tcp_ipv4_server_and_client();
|
||||
}
|
||||
#[test]
|
||||
#[ignore(cfg(target_os = "linux"))]
|
||||
fn test_gl_tcp_server_listener_and_client_ipv4() unsafe {
|
||||
impl_gl_tcp_ipv4_server_listener_and_client();
|
||||
}
|
||||
}
|
||||
}
|
||||
fn impl_gl_tcp_ipv4_server_and_client() {
|
||||
@ -1086,6 +1119,47 @@ mod test {
|
||||
assert str::contains(actual_req, expected_req);
|
||||
assert str::contains(actual_resp, expected_resp);
|
||||
}
|
||||
fn impl_gl_tcp_ipv4_server_listener_and_client() {
|
||||
let server_ip = "127.0.0.1";
|
||||
let server_port = 8889u;
|
||||
let expected_req = "ping";
|
||||
let expected_resp = "pong";
|
||||
|
||||
let server_result_po = comm::port::<str>();
|
||||
let server_result_ch = comm::chan(server_result_po);
|
||||
|
||||
let cont_po = comm::port::<()>();
|
||||
let cont_ch = comm::chan(cont_po);
|
||||
// server
|
||||
task::spawn_sched(task::manual_threads(1u)) {||
|
||||
let actual_req = comm::listen {|server_ch|
|
||||
run_tcp_test_server_listener(
|
||||
server_ip,
|
||||
server_port,
|
||||
expected_resp,
|
||||
server_ch,
|
||||
cont_ch)
|
||||
};
|
||||
server_result_ch.send(actual_req);
|
||||
};
|
||||
comm::recv(cont_po);
|
||||
// client
|
||||
log(debug, "server started, firing up client..");
|
||||
let actual_resp = comm::listen {|client_ch|
|
||||
run_tcp_test_client(
|
||||
server_ip,
|
||||
server_port,
|
||||
expected_req,
|
||||
client_ch)
|
||||
};
|
||||
let actual_req = comm::recv(server_result_po);
|
||||
log(debug, #fmt("REQ: expected: '%s' actual: '%s'",
|
||||
expected_req, actual_req));
|
||||
log(debug, #fmt("RESP: expected: '%s' actual: '%s'",
|
||||
expected_resp, actual_resp));
|
||||
assert str::contains(actual_req, expected_req);
|
||||
assert str::contains(actual_resp, expected_resp);
|
||||
}
|
||||
|
||||
fn run_tcp_test_server(server_ip: str, server_port: uint, resp: str,
|
||||
server_ch: comm::chan<str>,
|
||||
@ -1161,7 +1235,59 @@ mod test {
|
||||
log(debug, #fmt("SERVER: exited and got ret val: '%s'", ret_val));
|
||||
ret_val
|
||||
}
|
||||
|
||||
|
||||
fn run_tcp_test_server_listener(server_ip: str,
|
||||
server_port: uint, resp: str,
|
||||
server_ch: comm::chan<str>,
|
||||
cont_ch: comm::chan<()>) -> str {
|
||||
|
||||
task::spawn_sched(task::manual_threads(1u)) {||
|
||||
let server_ip_addr = ip::v4::parse_addr(server_ip);
|
||||
let new_listener_result =
|
||||
new_listener(server_ip_addr, server_port, 128u);
|
||||
if result::is_failure(new_listener_result) {
|
||||
let err_data = result::get_err(new_listener_result);
|
||||
log(debug, #fmt("SERVER: exited abnormally name %s msg %s",
|
||||
err_data.err_name, err_data.err_msg));
|
||||
fail "couldn't set up new listener";
|
||||
}
|
||||
let server_port = result::unwrap(new_listener_result);
|
||||
cont_ch.send(());
|
||||
// receive a single new connection.. normally this'd be
|
||||
// in a loop {}, but we're just going to take a single
|
||||
// client.. get their req, write a resp and then exit
|
||||
let new_conn_result = conn_recv(server_port);
|
||||
if result::is_failure(new_conn_result) {
|
||||
let err_data = result::get_err(new_conn_result);
|
||||
log(debug, #fmt("SERVER: exited abnormally name %s msg %s",
|
||||
err_data.err_name, err_data.err_msg));
|
||||
fail "couldn't recv new conn";
|
||||
}
|
||||
else {
|
||||
let sock = result::unwrap(new_conn_result);
|
||||
log(debug, "SERVER: successfully accepted"+
|
||||
"connection!");
|
||||
let received_req_bytes =
|
||||
tcp_read_single(sock);
|
||||
alt received_req_bytes {
|
||||
result::ok(data) {
|
||||
server_ch.send(
|
||||
str::from_bytes(data));
|
||||
log(debug, "SERVER: before write");
|
||||
tcp_write_single(sock, str::bytes(resp));
|
||||
log(debug, "SERVER: after write.. die");
|
||||
}
|
||||
result::err(err_data) {
|
||||
server_ch.send("");
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
let ret_val = server_ch.recv();
|
||||
log(debug, #fmt("SERVER: exited and got ret val: '%s'", ret_val));
|
||||
ret_val
|
||||
}
|
||||
|
||||
fn run_tcp_test_client(server_ip: str, server_port: uint, resp: str,
|
||||
client_ch: comm::chan<str>) -> str {
|
||||
|
||||
|
@ -453,10 +453,10 @@ rust_uv_get_kernel_global_chan_ptr() {
|
||||
|
||||
extern "C" void*
|
||||
rust_uv_current_kernel_malloc(size_t size) {
|
||||
return current_kernel_malloc(size, "rust_uv_current_kernel_malloc");
|
||||
return current_kernel_malloc(size, "rust_uv_current_kernel_malloc");
|
||||
}
|
||||
|
||||
extern "C" void
|
||||
rust_uv_current_kernel_free(void* mem) {
|
||||
current_kernel_free(mem);
|
||||
current_kernel_free(mem);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user