std: impl for high-level tcp client/request workflow
This commit is contained in:
parent
7e114b200a
commit
565c5d694a
@ -4,7 +4,7 @@
|
||||
|
||||
import ip = net_ip;
|
||||
|
||||
export tcp_connect_result, tcp_write_result;
|
||||
export tcp_connect_result, tcp_write_result, tcp_read_start_result;
|
||||
export connect, write;
|
||||
|
||||
resource tcp_socket(socket_data: @tcp_socket_data) unsafe {
|
||||
@ -36,6 +36,17 @@ enum tcp_write_result {
|
||||
tcp_write_error(uv::ll::uv_err_data)
|
||||
}
|
||||
|
||||
enum tcp_read_start_result {
|
||||
tcp_read_start_success(comm::port<tcp_read_result>),
|
||||
tcp_read_start_error(uv::ll::uv_err_data)
|
||||
}
|
||||
|
||||
enum tcp_read_result {
|
||||
tcp_read_data([u8]),
|
||||
tcp_read_done,
|
||||
tcp_read_err(uv::ll::uv_err_data)
|
||||
}
|
||||
|
||||
#[doc="
|
||||
Initiate a client connection over TCP/IP
|
||||
|
||||
@ -58,8 +69,10 @@ fn connect(input_ip: ip::ip_addr, port: uint) -> tcp_connect_result unsafe {
|
||||
};
|
||||
let conn_data_ptr = ptr::addr_of(conn_data);
|
||||
let hl_loop = uv::global_loop::get();
|
||||
let reader_po = comm::port::<tcp_read_result>();
|
||||
let socket_data = @{
|
||||
reader_port: comm::port::<[u8]>(),
|
||||
reader_po: reader_po,
|
||||
reader_ch: comm::chan(reader_po),
|
||||
stream_handle : uv::ll::tcp_t(),
|
||||
connect_req : uv::ll::connect_t(),
|
||||
write_req : uv::ll::write_t(),
|
||||
@ -183,9 +196,99 @@ fn write(sock: tcp_socket, raw_write_data: [[u8]]) -> tcp_write_result
|
||||
comm::recv(result_po)
|
||||
}
|
||||
|
||||
#[doc="
|
||||
"]
|
||||
fn read_start(sock: tcp_socket) -> tcp_read_start_result unsafe {
|
||||
let stream_handle_ptr = ptr::addr_of((**sock).stream_handle);
|
||||
let start_po = comm::port::<option<uv::ll::uv_err_data>>();
|
||||
let start_ch = comm::chan(start_po);
|
||||
uv::hl::interact((**sock).hl_loop) {|loop_ptr|
|
||||
log(debug, #fmt("in tcp::read_start interact cb %?", loop_ptr));
|
||||
alt uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t,
|
||||
on_alloc_cb,
|
||||
on_tcp_read_cb) {
|
||||
0i32 {
|
||||
log(debug, "success doing uv_read_start");
|
||||
comm::send(start_ch, none);
|
||||
}
|
||||
_ {
|
||||
log(debug, "error attempting uv_read_start");
|
||||
let err_data = uv::ll::get_last_err_data(loop_ptr);
|
||||
comm::send(start_ch, some(err_data));
|
||||
}
|
||||
}
|
||||
};
|
||||
alt comm::recv(start_po) {
|
||||
some(err_data) {
|
||||
tcp_read_start_error(err_data)
|
||||
}
|
||||
none {
|
||||
tcp_read_start_success((**sock).reader_po)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn read_stop(sock: tcp_socket) -> option<uv::ll::uv_err_data> unsafe {
|
||||
let stream_handle_ptr = ptr::addr_of((**sock).stream_handle);
|
||||
let stop_po = comm::port::<option<uv::ll::uv_err_data>>();
|
||||
let stop_ch = comm::chan(stop_po);
|
||||
uv::hl::interact((**sock).hl_loop) {|loop_ptr|
|
||||
log(debug, "in interact cb for tcp::read_stop");
|
||||
alt uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
|
||||
0i32 {
|
||||
log(debug, "successfully called uv_read_stop");
|
||||
comm::send(stop_ch, none);
|
||||
}
|
||||
_ {
|
||||
log(debug, "failure in calling uv_read_stop");
|
||||
let err_data = uv::ll::get_last_err_data(loop_ptr);
|
||||
comm::send(stop_ch, some(err_data));
|
||||
}
|
||||
}
|
||||
};
|
||||
comm::recv(stop_po)
|
||||
}
|
||||
|
||||
// INTERNAL API
|
||||
crust fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
|
||||
nread: libc::ssize_t,
|
||||
++buf: uv::ll::uv_buf_t) unsafe {
|
||||
let loop_ptr = uv::ll::get_loop_for_uv_handle(stream);
|
||||
let socket_data_ptr = uv::ll::get_data_for_uv_handle(stream)
|
||||
as *tcp_socket_data;
|
||||
let reader_ch = (*socket_data_ptr).reader_ch;
|
||||
alt nread {
|
||||
// incoming err.. probably eof
|
||||
-1 {
|
||||
let err_data = uv::ll::get_last_err_data(loop_ptr);
|
||||
comm::send(reader_ch, tcp_read_err(err_data));
|
||||
}
|
||||
// do nothing .. unneeded buf
|
||||
0 {}
|
||||
// have data
|
||||
_ {
|
||||
// we have data
|
||||
log(debug, #fmt("tcp on_read_cb nread: %d", nread));
|
||||
let buf_base = uv::ll::get_base_from_buf(buf);
|
||||
let buf_len = uv::ll::get_len_from_buf(buf);
|
||||
let new_bytes = vec::unsafe::from_buf(buf_base, buf_len);
|
||||
comm::send(reader_ch, tcp_read_data(new_bytes));
|
||||
}
|
||||
}
|
||||
uv::ll::free_base_of_buf(buf);
|
||||
}
|
||||
|
||||
crust fn on_alloc_cb(handle: *libc::c_void,
|
||||
++suggested_size: libc::size_t)
|
||||
-> uv::ll::uv_buf_t unsafe {
|
||||
log(debug, "tcp read on_alloc_cb!");
|
||||
let char_ptr = uv::ll::malloc_buf_base_of(suggested_size);
|
||||
log(debug, #fmt("tcp read on_alloc_cb h: %? char_ptr: %u sugsize: %u",
|
||||
handle,
|
||||
char_ptr as uint,
|
||||
suggested_size as uint));
|
||||
uv::ll::buf_init(char_ptr, suggested_size)
|
||||
}
|
||||
|
||||
type tcp_socket_close_data = {
|
||||
closed_ch: comm::chan<()>
|
||||
@ -272,9 +375,9 @@ enum conn_attempt {
|
||||
conn_failure(uv::ll::uv_err_data)
|
||||
}
|
||||
|
||||
|
||||
type tcp_socket_data = {
|
||||
reader_port: comm::port<[u8]>,
|
||||
reader_po: comm::port<tcp_read_result>,
|
||||
reader_ch: comm::chan<tcp_read_result>,
|
||||
stream_handle: uv::ll::uv_tcp_t,
|
||||
connect_req: uv::ll::uv_connect_t,
|
||||
write_req: uv::ll::uv_write_t,
|
||||
@ -310,30 +413,52 @@ fn test_gl_tcp_ipv4_request() {
|
||||
alt write(sock, write_data) {
|
||||
tcp_write_success {
|
||||
log(debug, "tcp::write successful");
|
||||
/*
|
||||
let mut total_read_data: [u8] = [];
|
||||
let reader_po = read_start(sock);
|
||||
loop {
|
||||
alt comm::recv(reader_po) {
|
||||
new_read_data(data) {
|
||||
total_read_data += data;
|
||||
// theoretically, we could keep iterating, here, if
|
||||
// we expect the server on the other end to keep
|
||||
// streaming/chunking data to us, but..
|
||||
read_stop(tcp_stream);
|
||||
break;
|
||||
}
|
||||
done_reading {
|
||||
break;
|
||||
}
|
||||
error {
|
||||
fail "erroring occured during read attempt.."
|
||||
+ "FIXME need info";
|
||||
}
|
||||
alt read_start(sock) {
|
||||
tcp_read_start_success(reader_po) {
|
||||
loop {
|
||||
alt comm::recv(reader_po) {
|
||||
tcp_read_data(new_data) {
|
||||
total_read_data += new_data;
|
||||
// theoretically, we could keep iterating, if
|
||||
// we expect the server on the other end to keep
|
||||
// streaming/chunking data to us, but..
|
||||
alt read_stop(sock) {
|
||||
some(err_data) {
|
||||
log(debug, "error while calling read_stop");
|
||||
log(debug, #fmt("read_stop error: %? %?",
|
||||
err_data.err_name,
|
||||
err_data.err_msg));
|
||||
assert false;
|
||||
}
|
||||
none {
|
||||
// exiting the read loop
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
tcp_read_done {
|
||||
break;
|
||||
}
|
||||
tcp_read_err(err_data) {
|
||||
log(debug, "read error data recv'd");
|
||||
log(debug, #fmt("read error: %? %?",
|
||||
err_data.err_name,
|
||||
err_data.err_msg));
|
||||
assert false;
|
||||
}
|
||||
}
|
||||
}
|
||||
comm::send(data_ch, total_read_data);
|
||||
}
|
||||
tcp_read_start_error(err_data) {
|
||||
log(debug, "tcp_read_start_error received..");
|
||||
log(debug, #fmt("tcp read_start error: %? %?",
|
||||
err_data.err_name,
|
||||
err_data.err_msg));
|
||||
assert false;
|
||||
}
|
||||
}
|
||||
comm::send(data_ch, total_read_data);
|
||||
*/
|
||||
}
|
||||
tcp_write_error(err_data) {
|
||||
log(debug, "tcp_write_error received..");
|
||||
|
Loading…
Reference in New Issue
Block a user