From 877747d0acd53a7ec271d97d6620347cda0430c6 Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Mon, 26 Mar 2012 11:09:57 -0700 Subject: [PATCH] wired up uv_read_start and some helper funcs around uv_alloc_cb tasks --- src/libstd/uv.rs | 146 +++++++++++++++++++++++++++++++++++++-------- src/rt/rust_uv.cpp | 31 +++++++++- 2 files changed, 150 insertions(+), 27 deletions(-) diff --git a/src/libstd/uv.rs b/src/libstd/uv.rs index 33676bc470d..fe22df30ae1 100644 --- a/src/libstd/uv.rs +++ b/src/libstd/uv.rs @@ -285,6 +285,10 @@ fn rust_uv_tcp_connect(connect_ptr: *uv_connect_t, fn rust_uv_write(req: *libc::c_void, stream: *libc::c_void, ++buf_in: *uv_buf_t, buf_cnt: libc::c_int, cb: *u8) -> libc::c_int; + fn rust_uv_read_start(stream: *libc::c_void, on_alloc: *u8, + on_read: *u8) -> libc::c_int; + fn rust_uv_malloc_buf_base_of(sug_size: libc::size_t) -> *u8; + fn rust_uv_free_base_of_buf(++buf: uv_buf_t); // sizeof testing helpers fn rust_uv_helper_uv_tcp_t_size() -> libc::c_uint; @@ -295,7 +299,11 @@ fn rust_uv_write(req: *libc::c_void, stream: *libc::c_void, fn rust_uv_helper_sockaddr_in_size() -> libc::c_uint; // data accessors for rust-mapped uv structs - fn rust_uv_get_stream_handle_for_connect(connect: *uv_connect_t) + fn rust_uv_get_stream_handle_from_connect_req( + connect_req: *uv_connect_t) + -> *uv_stream_t; + fn rust_uv_get_stream_handle_from_write_req( + write_req: *uv_write_t) -> *uv_stream_t; fn rust_uv_get_loop_for_uv_handle(handle: *libc::c_void) -> *libc::c_void; @@ -347,6 +355,11 @@ unsafe fn write(req: *libc::c_void, stream: *libc::c_void, let buf_cnt = vec::len(*buf_in) as i32; ret rustrt::rust_uv_write(req, stream, buf_ptr, buf_cnt, cb); } + unsafe fn read_start(stream: *uv_stream_t, on_alloc: *u8, + on_read: *u8) -> libc::c_int { + ret rustrt::rust_uv_read_start(stream as *libc::c_void, + on_alloc, on_read); + } unsafe fn uv_last_error(loop_handle: *libc::c_void) -> uv_err_t { ret rustrt::rust_uv_last_error(loop_handle); @@ -366,9 +379,16 @@ unsafe fn get_loop_for_uv_handle(handle: *libc::c_void) -> *libc::c_void { ret rustrt::rust_uv_get_loop_for_uv_handle(handle); } - unsafe fn get_stream_handle_for_connect(connect: *uv_connect_t) + unsafe fn get_stream_handle_from_connect_req(connect: *uv_connect_t) -> *uv_stream_t { - ret rustrt::rust_uv_get_stream_handle_for_connect(connect); + ret rustrt::rust_uv_get_stream_handle_from_connect_req( + connect); + } + unsafe fn get_stream_handle_from_write_req( + write_req: *uv_write_t) + -> *uv_stream_t { + ret rustrt::rust_uv_get_stream_handle_from_write_req( + write_req); } unsafe fn get_data_for_req(req: *libc::c_void) -> *libc::c_void { @@ -392,10 +412,12 @@ unsafe fn ip4_addr(ip: str, port: int) ret rustrt::rust_uv_ip4_addr(addr_vec_ptr, port as libc::c_int); } - // this is lame. - // TODO: see github issue #1402 - unsafe fn free_1402(ptr: *libc::c_void) { - rustrt::rust_uv_free(ptr); + unsafe fn malloc_buf_base_of(suggested_size: libc::size_t) + -> *u8 { + ret rustrt::rust_uv_malloc_buf_base_of(suggested_size); + } + unsafe fn free_base_of_buf(buf: uv_buf_t) { + rustrt::rust_uv_free_base_of_buf(buf); } } @@ -948,42 +970,80 @@ fn test_uv_timer() { // BEGIN TCP REQUEST TEST SUITE +enum tcp_read_data { + tcp_read_eof, + tcp_read_more([u8]), + tcp_read_error +} + type request_wrapper = { write_req: *uv_write_t, - req_buf: *[uv_buf_t] + req_buf: *[uv_buf_t], + read_chan: comm::chan }; -crust fn on_write_complete_cb(write_handle: *uv_write_t, +crust fn on_alloc_cb(handle: *libc::c_void, + suggested_size: libc::size_t) -> uv_buf_t + unsafe { + io::println("on_alloc_cb!"); + let char_ptr = direct::malloc_buf_base_of(suggested_size); + ret direct::buf_init(char_ptr, suggested_size); +} + +// do I need the explicit copy on the buf param? +crust fn on_read_cb(stream: *uv_stream_t, nread: libc::ssize_t, + ++buf: uv_buf_t) unsafe { + if (nread > 0) { + // we have data + io::println(#fmt("read: data! nread: %d", nread)); + } + else if (nread == -1) { + // err .. possibly EOF + io::println("read: eof!"); + } + else { + // nread == 0 .. do nothing, just free buf as below + io::println("read: do nothing!"); + } + // when we're done + direct::free_base_of_buf(buf); + io::println("exiting on_read_cb"); +} + +crust fn on_write_complete_cb(write_req: *uv_write_t, status: libc::c_int) unsafe { io::println(#fmt("beginning on_write_complete_cb status: %d", status as int)); - io::println("ending on_write_complete_cb"); + let stream = direct::get_stream_handle_from_write_req(write_req); + io::println(#fmt("on_write_complete_cb: tcp stream: %d write_handle addr %d", + stream as int, write_req as int)); + let result = direct::read_start(stream, on_alloc_cb, on_read_cb); + io::println(#fmt("ending on_write_complete_cb .. uv_read_start status: %d", result as int)); } -crust fn on_connect_cb(connect_handle_ptr: *uv_connect_t, +crust fn on_connect_cb(connect_req_ptr: *uv_connect_t, status: libc::c_int) unsafe { io::println(#fmt("beginning on_connect_cb .. status: %d", status as int)); - let stream = direct::get_stream_handle_for_connect(connect_handle_ptr); + let stream = + direct::get_stream_handle_from_connect_req(connect_req_ptr); if (status == 0i32) { io::println("on_connect_cb: in status=0 if.."); - let data = direct::get_data_for_req( - connect_handle_ptr as *libc::c_void) + let client_data = direct::get_data_for_req( + connect_req_ptr as *libc::c_void) as *request_wrapper; - let write_handle = (*data).write_req as *libc::c_void; + let write_handle = (*client_data).write_req as *libc::c_void; io::println(#fmt("on_connect_cb: tcp stream: %d write_handle addr %d", stream as int, write_handle as int)); let write_result = direct::write(write_handle, stream as *libc::c_void, - (*data).req_buf, + (*client_data).req_buf, on_write_complete_cb); io::println(#fmt("on_connect_cb: direct::write() status: %d", write_result as int)); } else { - let loop_handle = direct::get_loop_for_uv_handle( - stream as *libc::c_void); - let err = direct::uv_last_error(loop_handle); + io::println("non-zero status for on_connect_cb.."); } io::println("finishing on_connect_cb"); } @@ -993,7 +1053,7 @@ fn impl_uv_tcp_request() unsafe { let tcp_handle = direct::tcp_t(); let tcp_handle_ptr = ptr::addr_of(tcp_handle); let connect_handle = direct::connect_t(); - let connect_handle_ptr = ptr::addr_of(connect_handle); + let connect_req_ptr = ptr::addr_of(connect_handle); // this is the persistent payload of data that we // need to pass around to get this example to work. @@ -1013,8 +1073,11 @@ fn impl_uv_tcp_request() unsafe { io::println(#fmt("tcp req: tcp stream: %d write_handle: %d", tcp_handle_ptr as int, write_handle_ptr as int)); - let req = { writer_handle: write_handle_ptr, - req_buf: ptr::addr_of(req_msg) }; + let read_port = comm::port::(); + let read_chan = comm::chan::(read_port); + let client_data = { writer_handle: write_handle_ptr, + req_buf: ptr::addr_of(req_msg), + read_chan: read_chan }; let tcp_init_result = direct::tcp_init( test_loop as *libc::c_void, tcp_handle_ptr); @@ -1030,18 +1093,48 @@ fn impl_uv_tcp_request() unsafe { // this should set up the connection request.. let tcp_connect_result = direct::tcp_connect( - connect_handle_ptr, tcp_handle_ptr, + connect_req_ptr, tcp_handle_ptr, addr, on_connect_cb); if (tcp_connect_result == 0i32) { // not set the data on the connect_req // until its initialized direct::set_data_for_req( - connect_handle_ptr as *libc::c_void, - ptr::addr_of(req) as *libc::c_void); + connect_req_ptr as *libc::c_void, + ptr::addr_of(client_data) as *libc::c_void); io::println("before run tcp req loop"); direct::run(test_loop); io::println("after run tcp req loop"); - // TODO: see github issue #1402 + + // now we read from the port to get data + let mut read_bytes: [u8] = [0u8]; + let mut more_data = true; + while(more_data) { + alt comm::recv(read_port) { + tcp_read_eof { + more_data = false; + } + tcp_read_more(new_bytes) { + if (vec::len(read_bytes) == 1u && + read_bytes[0] == 0u8) { + // the "first" read.. replace + // the stubbed out vec above + // with our initial set of read + // data + } + else { + // otherwise append + read_bytes = new_bytes; + } + } + _ { + assert false; + } + } + } + io::println("finished reading data"); + let read_str = str::from_bytes(read_bytes); + + } else { io::println("direct::tcp_connect() failure"); @@ -1129,6 +1222,7 @@ fn impl_uv_byval_test() unsafe { addr.sin_port as uint)); } #[test] +#[ignore(cfg(target_os = "freebsd"))] fn test_uv_ip4_byval_passing_test() { impl_uv_byval_test(); } diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index b4694ccf37c..f753c55c69e 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -230,9 +230,13 @@ rust_uv_helper_sockaddr_in_size() { } extern "C" uv_stream_t* -rust_uv_get_stream_handle_for_connect(uv_connect_t* connect) { +rust_uv_get_stream_handle_from_connect_req(uv_connect_t* connect) { return connect->handle; } +extern "C" uv_stream_t* +rust_uv_get_stream_handle_from_write_req(uv_write_t* write_req) { + return write_req->handle; +} extern "C" uv_buf_t current_kernel_malloc_alloc_cb(uv_handle_t* handle, @@ -275,6 +279,16 @@ rust_uv_set_data_for_req(uv_req_t* req, void* data) { req->data = data; } +extern "C" char* +rust_uv_get_base_from_buf(uv_buf_t buf) { + return buf.base; +} + +extern "C" size_t +rust_uv_get_len_from_buf(uv_buf_t buf) { + return buf.len; +} + extern "C" uv_err_t rust_uv_last_error(uv_loop_t* loop) { return uv_last_error(loop); @@ -302,6 +316,21 @@ rust_uv_write(uv_write_t* req, uv_stream_t* handle, uv_write_cb cb) { return uv_write(req, handle, bufs, buf_cnt, cb); } +extern "C" int +rust_uv_read_start(uv_stream_t* stream, uv_alloc_cb on_alloc, + uv_read_cb on_read) { + return uv_read_start(stream, on_alloc, on_read); +} + +extern "C" char* +rust_uv_malloc_buf_base_of(size_t suggested_size) { + return (char*) current_kernel_malloc(sizeof(char)*suggested_size, + "uv_buf_t base"); +} +extern "C" void +rust_uv_free_base_of_buf(uv_buf_t buf) { + current_kernel_free(buf.base); +} extern "C" struct sockaddr_in rust_uv_ip4_addr(const char* ip, int port) {