From 565c5d694a51882fbbe6f1ebba370682c15bfbe8 Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Tue, 1 May 2012 18:27:07 -0700 Subject: [PATCH] std: impl for high-level tcp client/request workflow --- src/libstd/net_tcp.rs | 175 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 150 insertions(+), 25 deletions(-) diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs index 7f20af65366..c75afc7e021 100644 --- a/src/libstd/net_tcp.rs +++ b/src/libstd/net_tcp.rs @@ -4,7 +4,7 @@ High-level interface to libuv's TCP functionality import ip = net_ip; -export tcp_connect_result, tcp_write_result; +export tcp_connect_result, tcp_write_result, tcp_read_start_result; export connect, write; resource tcp_socket(socket_data: @tcp_socket_data) unsafe { @@ -36,6 +36,17 @@ enum tcp_write_result { tcp_write_error(uv::ll::uv_err_data) } +enum tcp_read_start_result { + tcp_read_start_success(comm::port), + tcp_read_start_error(uv::ll::uv_err_data) +} + +enum tcp_read_result { + tcp_read_data([u8]), + tcp_read_done, + tcp_read_err(uv::ll::uv_err_data) +} + #[doc=" Initiate a client connection over TCP/IP @@ -58,8 +69,10 @@ fn connect(input_ip: ip::ip_addr, port: uint) -> tcp_connect_result unsafe { }; let conn_data_ptr = ptr::addr_of(conn_data); let hl_loop = uv::global_loop::get(); + let reader_po = comm::port::(); let socket_data = @{ - reader_port: comm::port::<[u8]>(), + reader_po: reader_po, + reader_ch: comm::chan(reader_po), stream_handle : uv::ll::tcp_t(), connect_req : uv::ll::connect_t(), write_req : uv::ll::write_t(), @@ -183,9 +196,99 @@ fn write(sock: tcp_socket, raw_write_data: [[u8]]) -> tcp_write_result comm::recv(result_po) } +#[doc=" +"] +fn read_start(sock: tcp_socket) -> tcp_read_start_result unsafe { + let stream_handle_ptr = ptr::addr_of((**sock).stream_handle); + let start_po = comm::port::>(); + let start_ch = comm::chan(start_po); + uv::hl::interact((**sock).hl_loop) {|loop_ptr| + log(debug, #fmt("in tcp::read_start interact cb %?", loop_ptr)); + alt uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t, + on_alloc_cb, + on_tcp_read_cb) { + 0i32 { + log(debug, "success doing uv_read_start"); + comm::send(start_ch, none); + } + _ { + log(debug, "error attempting uv_read_start"); + let err_data = uv::ll::get_last_err_data(loop_ptr); + comm::send(start_ch, some(err_data)); + } + } + }; + alt comm::recv(start_po) { + some(err_data) { + tcp_read_start_error(err_data) + } + none { + tcp_read_start_success((**sock).reader_po) + } + } +} +fn read_stop(sock: tcp_socket) -> option unsafe { + let stream_handle_ptr = ptr::addr_of((**sock).stream_handle); + let stop_po = comm::port::>(); + let stop_ch = comm::chan(stop_po); + uv::hl::interact((**sock).hl_loop) {|loop_ptr| + log(debug, "in interact cb for tcp::read_stop"); + alt uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) { + 0i32 { + log(debug, "successfully called uv_read_stop"); + comm::send(stop_ch, none); + } + _ { + log(debug, "failure in calling uv_read_stop"); + let err_data = uv::ll::get_last_err_data(loop_ptr); + comm::send(stop_ch, some(err_data)); + } + } + }; + comm::recv(stop_po) +} // INTERNAL API +crust fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t, + nread: libc::ssize_t, + ++buf: uv::ll::uv_buf_t) unsafe { + let loop_ptr = uv::ll::get_loop_for_uv_handle(stream); + let socket_data_ptr = uv::ll::get_data_for_uv_handle(stream) + as *tcp_socket_data; + let reader_ch = (*socket_data_ptr).reader_ch; + alt nread { + // incoming err.. probably eof + -1 { + let err_data = uv::ll::get_last_err_data(loop_ptr); + comm::send(reader_ch, tcp_read_err(err_data)); + } + // do nothing .. unneeded buf + 0 {} + // have data + _ { + // we have data + log(debug, #fmt("tcp on_read_cb nread: %d", nread)); + let buf_base = uv::ll::get_base_from_buf(buf); + let buf_len = uv::ll::get_len_from_buf(buf); + let new_bytes = vec::unsafe::from_buf(buf_base, buf_len); + comm::send(reader_ch, tcp_read_data(new_bytes)); + } + } + uv::ll::free_base_of_buf(buf); +} + +crust fn on_alloc_cb(handle: *libc::c_void, + ++suggested_size: libc::size_t) + -> uv::ll::uv_buf_t unsafe { + log(debug, "tcp read on_alloc_cb!"); + let char_ptr = uv::ll::malloc_buf_base_of(suggested_size); + log(debug, #fmt("tcp read on_alloc_cb h: %? char_ptr: %u sugsize: %u", + handle, + char_ptr as uint, + suggested_size as uint)); + uv::ll::buf_init(char_ptr, suggested_size) +} type tcp_socket_close_data = { closed_ch: comm::chan<()> @@ -272,9 +375,9 @@ enum conn_attempt { conn_failure(uv::ll::uv_err_data) } - type tcp_socket_data = { - reader_port: comm::port<[u8]>, + reader_po: comm::port, + reader_ch: comm::chan, stream_handle: uv::ll::uv_tcp_t, connect_req: uv::ll::uv_connect_t, write_req: uv::ll::uv_write_t, @@ -310,30 +413,52 @@ mod test { alt write(sock, write_data) { tcp_write_success { log(debug, "tcp::write successful"); - /* let mut total_read_data: [u8] = []; - let reader_po = read_start(sock); - loop { - alt comm::recv(reader_po) { - new_read_data(data) { - total_read_data += data; - // theoretically, we could keep iterating, here, if - // we expect the server on the other end to keep - // streaming/chunking data to us, but.. - read_stop(tcp_stream); - break; - } - done_reading { - break; - } - error { - fail "erroring occured during read attempt.." - + "FIXME need info"; - } + alt read_start(sock) { + tcp_read_start_success(reader_po) { + loop { + alt comm::recv(reader_po) { + tcp_read_data(new_data) { + total_read_data += new_data; + // theoretically, we could keep iterating, if + // we expect the server on the other end to keep + // streaming/chunking data to us, but.. + alt read_stop(sock) { + some(err_data) { + log(debug, "error while calling read_stop"); + log(debug, #fmt("read_stop error: %? %?", + err_data.err_name, + err_data.err_msg)); + assert false; + } + none { + // exiting the read loop + break; + } + } + } + tcp_read_done { + break; + } + tcp_read_err(err_data) { + log(debug, "read error data recv'd"); + log(debug, #fmt("read error: %? %?", + err_data.err_name, + err_data.err_msg)); + assert false; + } + } } + comm::send(data_ch, total_read_data); + } + tcp_read_start_error(err_data) { + log(debug, "tcp_read_start_error received.."); + log(debug, #fmt("tcp read_start error: %? %?", + err_data.err_name, + err_data.err_msg)); + assert false; + } } - comm::send(data_ch, total_read_data); - */ } tcp_write_error(err_data) { log(debug, "tcp_write_error received..");