wired up uv_read_start and some helper funcs around uv_alloc_cb tasks
This commit is contained in:
parent
e0193dac6e
commit
877747d0ac
146
src/libstd/uv.rs
146
src/libstd/uv.rs
@ -285,6 +285,10 @@ fn rust_uv_tcp_connect(connect_ptr: *uv_connect_t,
|
||||
fn rust_uv_write(req: *libc::c_void, stream: *libc::c_void,
|
||||
++buf_in: *uv_buf_t, buf_cnt: libc::c_int,
|
||||
cb: *u8) -> libc::c_int;
|
||||
fn rust_uv_read_start(stream: *libc::c_void, on_alloc: *u8,
|
||||
on_read: *u8) -> libc::c_int;
|
||||
fn rust_uv_malloc_buf_base_of(sug_size: libc::size_t) -> *u8;
|
||||
fn rust_uv_free_base_of_buf(++buf: uv_buf_t);
|
||||
|
||||
// sizeof testing helpers
|
||||
fn rust_uv_helper_uv_tcp_t_size() -> libc::c_uint;
|
||||
@ -295,7 +299,11 @@ fn rust_uv_write(req: *libc::c_void, stream: *libc::c_void,
|
||||
fn rust_uv_helper_sockaddr_in_size() -> libc::c_uint;
|
||||
|
||||
// data accessors for rust-mapped uv structs
|
||||
fn rust_uv_get_stream_handle_for_connect(connect: *uv_connect_t)
|
||||
fn rust_uv_get_stream_handle_from_connect_req(
|
||||
connect_req: *uv_connect_t)
|
||||
-> *uv_stream_t;
|
||||
fn rust_uv_get_stream_handle_from_write_req(
|
||||
write_req: *uv_write_t)
|
||||
-> *uv_stream_t;
|
||||
fn rust_uv_get_loop_for_uv_handle(handle: *libc::c_void)
|
||||
-> *libc::c_void;
|
||||
@ -347,6 +355,11 @@ unsafe fn write(req: *libc::c_void, stream: *libc::c_void,
|
||||
let buf_cnt = vec::len(*buf_in) as i32;
|
||||
ret rustrt::rust_uv_write(req, stream, buf_ptr, buf_cnt, cb);
|
||||
}
|
||||
unsafe fn read_start(stream: *uv_stream_t, on_alloc: *u8,
|
||||
on_read: *u8) -> libc::c_int {
|
||||
ret rustrt::rust_uv_read_start(stream as *libc::c_void,
|
||||
on_alloc, on_read);
|
||||
}
|
||||
|
||||
unsafe fn uv_last_error(loop_handle: *libc::c_void) -> uv_err_t {
|
||||
ret rustrt::rust_uv_last_error(loop_handle);
|
||||
@ -366,9 +379,16 @@ unsafe fn get_loop_for_uv_handle(handle: *libc::c_void)
|
||||
-> *libc::c_void {
|
||||
ret rustrt::rust_uv_get_loop_for_uv_handle(handle);
|
||||
}
|
||||
unsafe fn get_stream_handle_for_connect(connect: *uv_connect_t)
|
||||
unsafe fn get_stream_handle_from_connect_req(connect: *uv_connect_t)
|
||||
-> *uv_stream_t {
|
||||
ret rustrt::rust_uv_get_stream_handle_for_connect(connect);
|
||||
ret rustrt::rust_uv_get_stream_handle_from_connect_req(
|
||||
connect);
|
||||
}
|
||||
unsafe fn get_stream_handle_from_write_req(
|
||||
write_req: *uv_write_t)
|
||||
-> *uv_stream_t {
|
||||
ret rustrt::rust_uv_get_stream_handle_from_write_req(
|
||||
write_req);
|
||||
}
|
||||
|
||||
unsafe fn get_data_for_req(req: *libc::c_void) -> *libc::c_void {
|
||||
@ -392,10 +412,12 @@ unsafe fn ip4_addr(ip: str, port: int)
|
||||
ret rustrt::rust_uv_ip4_addr(addr_vec_ptr,
|
||||
port as libc::c_int);
|
||||
}
|
||||
// this is lame.
|
||||
// TODO: see github issue #1402
|
||||
unsafe fn free_1402(ptr: *libc::c_void) {
|
||||
rustrt::rust_uv_free(ptr);
|
||||
unsafe fn malloc_buf_base_of(suggested_size: libc::size_t)
|
||||
-> *u8 {
|
||||
ret rustrt::rust_uv_malloc_buf_base_of(suggested_size);
|
||||
}
|
||||
unsafe fn free_base_of_buf(buf: uv_buf_t) {
|
||||
rustrt::rust_uv_free_base_of_buf(buf);
|
||||
}
|
||||
}
|
||||
|
||||
@ -948,42 +970,80 @@ fn test_uv_timer() {
|
||||
|
||||
// BEGIN TCP REQUEST TEST SUITE
|
||||
|
||||
enum tcp_read_data {
|
||||
tcp_read_eof,
|
||||
tcp_read_more([u8]),
|
||||
tcp_read_error
|
||||
}
|
||||
|
||||
type request_wrapper = {
|
||||
write_req: *uv_write_t,
|
||||
req_buf: *[uv_buf_t]
|
||||
req_buf: *[uv_buf_t],
|
||||
read_chan: comm::chan<tcp_read_data>
|
||||
};
|
||||
|
||||
crust fn on_write_complete_cb(write_handle: *uv_write_t,
|
||||
crust fn on_alloc_cb(handle: *libc::c_void,
|
||||
suggested_size: libc::size_t) -> uv_buf_t
|
||||
unsafe {
|
||||
io::println("on_alloc_cb!");
|
||||
let char_ptr = direct::malloc_buf_base_of(suggested_size);
|
||||
ret direct::buf_init(char_ptr, suggested_size);
|
||||
}
|
||||
|
||||
// do I need the explicit copy on the buf param?
|
||||
crust fn on_read_cb(stream: *uv_stream_t, nread: libc::ssize_t,
|
||||
++buf: uv_buf_t) unsafe {
|
||||
if (nread > 0) {
|
||||
// we have data
|
||||
io::println(#fmt("read: data! nread: %d", nread));
|
||||
}
|
||||
else if (nread == -1) {
|
||||
// err .. possibly EOF
|
||||
io::println("read: eof!");
|
||||
}
|
||||
else {
|
||||
// nread == 0 .. do nothing, just free buf as below
|
||||
io::println("read: do nothing!");
|
||||
}
|
||||
// when we're done
|
||||
direct::free_base_of_buf(buf);
|
||||
io::println("exiting on_read_cb");
|
||||
}
|
||||
|
||||
crust fn on_write_complete_cb(write_req: *uv_write_t,
|
||||
status: libc::c_int) unsafe {
|
||||
io::println(#fmt("beginning on_write_complete_cb status: %d",
|
||||
status as int));
|
||||
io::println("ending on_write_complete_cb");
|
||||
let stream = direct::get_stream_handle_from_write_req(write_req);
|
||||
io::println(#fmt("on_write_complete_cb: tcp stream: %d write_handle addr %d",
|
||||
stream as int, write_req as int));
|
||||
let result = direct::read_start(stream, on_alloc_cb, on_read_cb);
|
||||
io::println(#fmt("ending on_write_complete_cb .. uv_read_start status: %d", result as int));
|
||||
}
|
||||
|
||||
crust fn on_connect_cb(connect_handle_ptr: *uv_connect_t,
|
||||
crust fn on_connect_cb(connect_req_ptr: *uv_connect_t,
|
||||
status: libc::c_int) unsafe {
|
||||
io::println(#fmt("beginning on_connect_cb .. status: %d",
|
||||
status as int));
|
||||
let stream = direct::get_stream_handle_for_connect(connect_handle_ptr);
|
||||
let stream =
|
||||
direct::get_stream_handle_from_connect_req(connect_req_ptr);
|
||||
if (status == 0i32) {
|
||||
io::println("on_connect_cb: in status=0 if..");
|
||||
let data = direct::get_data_for_req(
|
||||
connect_handle_ptr as *libc::c_void)
|
||||
let client_data = direct::get_data_for_req(
|
||||
connect_req_ptr as *libc::c_void)
|
||||
as *request_wrapper;
|
||||
let write_handle = (*data).write_req as *libc::c_void;
|
||||
let write_handle = (*client_data).write_req as *libc::c_void;
|
||||
io::println(#fmt("on_connect_cb: tcp stream: %d write_handle addr %d",
|
||||
stream as int, write_handle as int));
|
||||
let write_result = direct::write(write_handle,
|
||||
stream as *libc::c_void,
|
||||
(*data).req_buf,
|
||||
(*client_data).req_buf,
|
||||
on_write_complete_cb);
|
||||
io::println(#fmt("on_connect_cb: direct::write() status: %d",
|
||||
write_result as int));
|
||||
}
|
||||
else {
|
||||
let loop_handle = direct::get_loop_for_uv_handle(
|
||||
stream as *libc::c_void);
|
||||
let err = direct::uv_last_error(loop_handle);
|
||||
io::println("non-zero status for on_connect_cb..");
|
||||
}
|
||||
io::println("finishing on_connect_cb");
|
||||
}
|
||||
@ -993,7 +1053,7 @@ fn impl_uv_tcp_request() unsafe {
|
||||
let tcp_handle = direct::tcp_t();
|
||||
let tcp_handle_ptr = ptr::addr_of(tcp_handle);
|
||||
let connect_handle = direct::connect_t();
|
||||
let connect_handle_ptr = ptr::addr_of(connect_handle);
|
||||
let connect_req_ptr = ptr::addr_of(connect_handle);
|
||||
|
||||
// this is the persistent payload of data that we
|
||||
// need to pass around to get this example to work.
|
||||
@ -1013,8 +1073,11 @@ fn impl_uv_tcp_request() unsafe {
|
||||
io::println(#fmt("tcp req: tcp stream: %d write_handle: %d",
|
||||
tcp_handle_ptr as int,
|
||||
write_handle_ptr as int));
|
||||
let req = { writer_handle: write_handle_ptr,
|
||||
req_buf: ptr::addr_of(req_msg) };
|
||||
let read_port = comm::port::<tcp_read_data>();
|
||||
let read_chan = comm::chan::<tcp_read_data>(read_port);
|
||||
let client_data = { writer_handle: write_handle_ptr,
|
||||
req_buf: ptr::addr_of(req_msg),
|
||||
read_chan: read_chan };
|
||||
|
||||
let tcp_init_result = direct::tcp_init(
|
||||
test_loop as *libc::c_void, tcp_handle_ptr);
|
||||
@ -1030,18 +1093,48 @@ fn impl_uv_tcp_request() unsafe {
|
||||
|
||||
// this should set up the connection request..
|
||||
let tcp_connect_result = direct::tcp_connect(
|
||||
connect_handle_ptr, tcp_handle_ptr,
|
||||
connect_req_ptr, tcp_handle_ptr,
|
||||
addr, on_connect_cb);
|
||||
if (tcp_connect_result == 0i32) {
|
||||
// not set the data on the connect_req
|
||||
// until its initialized
|
||||
direct::set_data_for_req(
|
||||
connect_handle_ptr as *libc::c_void,
|
||||
ptr::addr_of(req) as *libc::c_void);
|
||||
connect_req_ptr as *libc::c_void,
|
||||
ptr::addr_of(client_data) as *libc::c_void);
|
||||
io::println("before run tcp req loop");
|
||||
direct::run(test_loop);
|
||||
io::println("after run tcp req loop");
|
||||
// TODO: see github issue #1402
|
||||
|
||||
// now we read from the port to get data
|
||||
let mut read_bytes: [u8] = [0u8];
|
||||
let mut more_data = true;
|
||||
while(more_data) {
|
||||
alt comm::recv(read_port) {
|
||||
tcp_read_eof {
|
||||
more_data = false;
|
||||
}
|
||||
tcp_read_more(new_bytes) {
|
||||
if (vec::len(read_bytes) == 1u &&
|
||||
read_bytes[0] == 0u8) {
|
||||
// the "first" read.. replace
|
||||
// the stubbed out vec above
|
||||
// with our initial set of read
|
||||
// data
|
||||
}
|
||||
else {
|
||||
// otherwise append
|
||||
read_bytes = new_bytes;
|
||||
}
|
||||
}
|
||||
_ {
|
||||
assert false;
|
||||
}
|
||||
}
|
||||
}
|
||||
io::println("finished reading data");
|
||||
let read_str = str::from_bytes(read_bytes);
|
||||
|
||||
|
||||
}
|
||||
else {
|
||||
io::println("direct::tcp_connect() failure");
|
||||
@ -1129,6 +1222,7 @@ fn impl_uv_byval_test() unsafe {
|
||||
addr.sin_port as uint));
|
||||
}
|
||||
#[test]
|
||||
#[ignore(cfg(target_os = "freebsd"))]
|
||||
fn test_uv_ip4_byval_passing_test() {
|
||||
impl_uv_byval_test();
|
||||
}
|
||||
|
@ -230,9 +230,13 @@ rust_uv_helper_sockaddr_in_size() {
|
||||
}
|
||||
|
||||
extern "C" uv_stream_t*
|
||||
rust_uv_get_stream_handle_for_connect(uv_connect_t* connect) {
|
||||
rust_uv_get_stream_handle_from_connect_req(uv_connect_t* connect) {
|
||||
return connect->handle;
|
||||
}
|
||||
extern "C" uv_stream_t*
|
||||
rust_uv_get_stream_handle_from_write_req(uv_write_t* write_req) {
|
||||
return write_req->handle;
|
||||
}
|
||||
|
||||
extern "C" uv_buf_t
|
||||
current_kernel_malloc_alloc_cb(uv_handle_t* handle,
|
||||
@ -275,6 +279,16 @@ rust_uv_set_data_for_req(uv_req_t* req, void* data) {
|
||||
req->data = data;
|
||||
}
|
||||
|
||||
extern "C" char*
|
||||
rust_uv_get_base_from_buf(uv_buf_t buf) {
|
||||
return buf.base;
|
||||
}
|
||||
|
||||
extern "C" size_t
|
||||
rust_uv_get_len_from_buf(uv_buf_t buf) {
|
||||
return buf.len;
|
||||
}
|
||||
|
||||
extern "C" uv_err_t
|
||||
rust_uv_last_error(uv_loop_t* loop) {
|
||||
return uv_last_error(loop);
|
||||
@ -302,6 +316,21 @@ rust_uv_write(uv_write_t* req, uv_stream_t* handle,
|
||||
uv_write_cb cb) {
|
||||
return uv_write(req, handle, bufs, buf_cnt, cb);
|
||||
}
|
||||
extern "C" int
|
||||
rust_uv_read_start(uv_stream_t* stream, uv_alloc_cb on_alloc,
|
||||
uv_read_cb on_read) {
|
||||
return uv_read_start(stream, on_alloc, on_read);
|
||||
}
|
||||
|
||||
extern "C" char*
|
||||
rust_uv_malloc_buf_base_of(size_t suggested_size) {
|
||||
return (char*) current_kernel_malloc(sizeof(char)*suggested_size,
|
||||
"uv_buf_t base");
|
||||
}
|
||||
extern "C" void
|
||||
rust_uv_free_base_of_buf(uv_buf_t buf) {
|
||||
current_kernel_free(buf.base);
|
||||
}
|
||||
|
||||
extern "C" struct sockaddr_in
|
||||
rust_uv_ip4_addr(const char* ip, int port) {
|
||||
|
Loading…
Reference in New Issue
Block a user