diff --git a/src/libstd/arc.rs b/src/libstd/arc.rs index b6139e93723..d6e59d6149f 100644 --- a/src/libstd/arc.rs +++ b/src/libstd/arc.rs @@ -420,12 +420,11 @@ mod tests { let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; let arc_v = arc::arc(v); - let p = port(); - let c = chan(p); + let (c, p) = pipes::stream(); do task::spawn() { - let p = port(); - c.send(chan(p)); + let p = pipes::port_set(); + c.send(p.chan()); let arc_v = p.recv(); diff --git a/src/libstd/comm.rs b/src/libstd/comm.rs new file mode 100644 index 00000000000..c2f3518976e --- /dev/null +++ b/src/libstd/comm.rs @@ -0,0 +1,76 @@ +/*! + +Higher level communication abstractions. + +*/ + +// NB: transitionary, de-mode-ing. +#[forbid(deprecated_mode)]; +#[forbid(deprecated_pattern)]; + +// Make sure we follow the new conventions +#[forbid(non_camel_case_types)]; + +import pipes::{channel, recv, chan, port, selectable}; + +export DuplexStream; + +/// An extension of `pipes::stream` that allows both sending and receiving. +struct DuplexStream : channel, recv, selectable { + priv chan: chan; + priv port: port; + + fn send(+x: T) { + self.chan.send(x) + } + + fn try_send(+x: T) -> bool { + self.chan.try_send(x) + } + + fn recv() -> U { + self.port.recv() + } + + fn try_recv() -> option { + self.port.try_recv() + } + + pure fn peek() -> bool { + self.port.peek() + } + + pure fn header() -> *pipes::packet_header { + self.port.header() + } +} + +/// Creates a bidirectional stream. +fn DuplexStream() + -> (DuplexStream, DuplexStream) +{ + let (c2, p1) = pipes::stream(); + let (c1, p2) = pipes::stream(); + (DuplexStream { + chan: c1, + port: p1 + }, + DuplexStream { + chan: c2, + port: p2 + }) +} + +#[cfg(test)] +mod test { + #[test] + fn DuplexStream1() { + let (left, right) = DuplexStream(); + + left.send(~"abc"); + right.send(123); + + assert left.recv() == 123; + assert right.recv() == ~"abc"; + } +} diff --git a/src/libstd/net_ip.rs b/src/libstd/net_ip.rs index b361e38ac29..bd4520e44a4 100644 --- a/src/libstd/net_ip.rs +++ b/src/libstd/net_ip.rs @@ -19,6 +19,7 @@ import create_uv_getaddrinfo_t = uv::ll::getaddrinfo_t; import set_data_for_req = uv::ll::set_data_for_req; import get_data_for_req = uv::ll::get_data_for_req; import ll = uv::ll; +import comm = core::comm; export ip_addr, parse_addr_err; export format_addr; @@ -85,7 +86,7 @@ enum ip_get_addr_err { */ fn get_addr(++node: ~str, iotask: iotask) -> result::result<~[ip_addr], ip_get_addr_err> unsafe { - do comm::listen |output_ch| { + do core::comm::listen |output_ch| { do str::as_buf(node) |node_ptr, len| { log(debug, fmt!{"slice len %?", len}); let handle = create_uv_getaddrinfo_t(); diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs index e07075dd293..e14b15b4eff 100644 --- a/src/libstd/net_tcp.rs +++ b/src/libstd/net_tcp.rs @@ -9,6 +9,7 @@ import future_spawn = future::spawn; import result::*; import libc::size_t; import io::{Reader, Writer}; +import comm = core::comm; // tcp interfaces export tcp_socket; @@ -120,19 +121,19 @@ enum tcp_connect_err_data { fn connect(-input_ip: ip::ip_addr, port: uint, iotask: iotask) -> result::result unsafe { - let result_po = comm::port::(); - let closed_signal_po = comm::port::<()>(); + let result_po = core::comm::port::(); + let closed_signal_po = core::comm::port::<()>(); let conn_data = { - result_ch: comm::chan(result_po), - closed_signal_ch: comm::chan(closed_signal_po) + result_ch: core::comm::chan(result_po), + closed_signal_ch: core::comm::chan(closed_signal_po) }; let conn_data_ptr = ptr::addr_of(conn_data); - let reader_po = comm::port::>(); + let reader_po = core::comm::port::>(); let stream_handle_ptr = malloc_uv_tcp_t(); *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t(); let socket_data = @{ reader_po: reader_po, - reader_ch: comm::chan(reader_po), + reader_ch: core::comm::chan(reader_po), stream_handle_ptr: stream_handle_ptr, connect_req: uv::ll::connect_t(), write_req: uv::ll::write_t(), @@ -202,7 +203,7 @@ fn connect(-input_ip: ip::ip_addr, port: uint, // immediate connect failure.. probably a garbage // ip or somesuch let err_data = uv::ll::get_last_err_data(loop_ptr); - comm::send((*conn_data_ptr).result_ch, + core::comm::send((*conn_data_ptr).result_ch, conn_failure(err_data.to_tcp_err())); uv::ll::set_data_for_uv_handle(stream_handle_ptr, conn_data_ptr); @@ -215,18 +216,18 @@ fn connect(-input_ip: ip::ip_addr, port: uint, _ => { // failure to create a tcp handle let err_data = uv::ll::get_last_err_data(loop_ptr); - comm::send((*conn_data_ptr).result_ch, + core::comm::send((*conn_data_ptr).result_ch, conn_failure(err_data.to_tcp_err())); } } }; - match comm::recv(result_po) { + match core::comm::recv(result_po) { conn_success => { log(debug, ~"tcp::connect - received success on result_po"); result::ok(tcp_socket(socket_data)) } conn_failure(err_data) => { - comm::recv(closed_signal_po); + core::comm::recv(closed_signal_po); log(debug, ~"tcp::connect - received failure on result_po"); // still have to free the malloc'd stream handle.. rustrt::rust_uv_current_kernel_free(stream_handle_ptr @@ -311,8 +312,9 @@ fn write_future(sock: tcp_socket, raw_write_data: ~[u8]) * # Returns * * * A `result` instance that will either contain a - * `comm::port` that the user can read (and optionally, loop - * on) from until `read_stop` is called, or a `tcp_err_data` record + * `core::comm::port` that the user can read (and + * optionally, loop on) from until `read_stop` is called, or a + * `tcp_err_data` record */ fn read_start(sock: tcp_socket) -> result::result>(); - * let cont_ch = comm::chan(cont_po); + * let cont_po = core::comm::port::>(); + * let cont_ch = core::comm::chan(cont_po); * task::spawn {|| * let accept_result = net::tcp::accept(new_conn); * if accept_result.is_err() { - * comm::send(cont_ch, result::get_err(accept_result)); + * core::comm::send(cont_ch, result::get_err(accept_result)); * // fail? * } * else { * let sock = result::get(accept_result); - * comm::send(cont_ch, true); + * core::comm::send(cont_ch, true); * // do work here * } * }; - * match comm::recv(cont_po) { + * match core::comm::recv(cont_po) { * // shut down listen() - * some(err_data) { comm::send(kill_chan, some(err_data)) } + * some(err_data) { core::comm::send(kill_chan, some(err_data)) } * // wait for next connection * none {} * } @@ -470,13 +472,13 @@ fn accept(new_conn: tcp_new_connection) new_tcp_conn(server_handle_ptr) => { let server_data_ptr = uv::ll::get_data_for_uv_handle( server_handle_ptr) as *tcp_listen_fc_data; - let reader_po = comm::port::>(); + let reader_po = core::comm::port(); let iotask = (*server_data_ptr).iotask; let stream_handle_ptr = malloc_uv_tcp_t(); *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t(); let client_socket_data = @{ reader_po: reader_po, - reader_ch: comm::chan(reader_po), + reader_ch: core::comm::chan(reader_po), stream_handle_ptr : stream_handle_ptr, connect_req : uv::ll::connect_t(), write_req : uv::ll::write_t(), @@ -486,8 +488,8 @@ fn accept(new_conn: tcp_new_connection) let client_stream_handle_ptr = (*client_socket_data_ptr).stream_handle_ptr; - let result_po = comm::port::>(); - let result_ch = comm::chan(result_po); + let result_po = core::comm::port::>(); + let result_ch = core::comm::chan(result_po); // UNSAFE LIBUV INTERACTION BEGIN // .. normally this happens within the context of @@ -509,23 +511,23 @@ fn accept(new_conn: tcp_new_connection) uv::ll::set_data_for_uv_handle(client_stream_handle_ptr, client_socket_data_ptr as *libc::c_void); - comm::send(result_ch, none); + core::comm::send(result_ch, none); } _ => { log(debug, ~"failed to accept client conn"); - comm::send(result_ch, some( + core::comm::send(result_ch, some( uv::ll::get_last_err_data(loop_ptr).to_tcp_err())); } } } _ => { log(debug, ~"failed to init client stream"); - comm::send(result_ch, some( + core::comm::send(result_ch, some( uv::ll::get_last_err_data(loop_ptr).to_tcp_err())); } } // UNSAFE LIBUV INTERACTION END - match comm::recv(result_po) { + match core::comm::recv(result_po) { some(err_data) => result::err(err_data), none => result::ok(tcp_socket(client_socket_data)) } @@ -551,8 +553,8 @@ fn accept(new_conn: tcp_new_connection) * callback's arguments are: * * `new_conn` - an opaque type that can be passed to * `net::tcp::accept` in order to be converted to a `tcp_socket`. - * * `kill_ch` - channel of type `comm::chan>`. this - * channel can be used to send a message to cause `listen` to begin + * * `kill_ch` - channel of type `core::comm::chan>`. + * this channel can be used to send a message to cause `listen` to begin * closing the underlying libuv data structures. * * # returns @@ -583,14 +585,14 @@ fn listen_common(-host_ip: ip::ip_addr, port: uint, backlog: uint, on_establish_cb: fn~(comm::Chan>), -on_connect_cb: fn~(*uv::ll::uv_tcp_t)) -> result::result<(), tcp_listen_err_data> unsafe { - let stream_closed_po = comm::port::<()>(); - let kill_po = comm::port::>(); - let kill_ch = comm::chan(kill_po); + let stream_closed_po = core::comm::port::<()>(); + let kill_po = core::comm::port::>(); + let kill_ch = core::comm::chan(kill_po); let server_stream = uv::ll::tcp_t(); let server_stream_ptr = ptr::addr_of(server_stream); let server_data = { server_stream_ptr: server_stream_ptr, - stream_closed_ch: comm::chan(stream_closed_po), + stream_closed_ch: core::comm::chan(stream_closed_po), kill_ch: kill_ch, on_connect_cb: on_connect_cb, iotask: iotask, @@ -598,13 +600,13 @@ fn listen_common(-host_ip: ip::ip_addr, port: uint, backlog: uint, }; let server_data_ptr = ptr::addr_of(server_data); - let setup_result = do comm::listen |setup_ch| { + let setup_result = do core::comm::listen |setup_ch| { // this is to address a compiler warning about // an implicit copy.. it seems that double nested // will defeat a move sigil, as is done to the host_ip // arg above.. this same pattern works w/o complaint in // tcp::connect (because the iotask::interact cb isn't - // nested within a comm::listen block) + // nested within a core::comm::listen block) let loc_ip = copy(host_ip); do iotask::interact(iotask) |loop_ptr| { match uv::ll::tcp_init(loop_ptr, server_stream_ptr) { @@ -632,25 +634,25 @@ fn listen_common(-host_ip: ip::ip_addr, port: uint, backlog: uint, match uv::ll::listen(server_stream_ptr, backlog as libc::c_int, tcp_lfc_on_connection_cb) { - 0i32 => comm::send(setup_ch, none), + 0i32 => core::comm::send(setup_ch, none), _ => { log(debug, ~"failure to uv_listen()"); let err_data = uv::ll::get_last_err_data(loop_ptr); - comm::send(setup_ch, some(err_data)); + core::comm::send(setup_ch, some(err_data)); } } } _ => { log(debug, ~"failure to uv_tcp_bind"); let err_data = uv::ll::get_last_err_data(loop_ptr); - comm::send(setup_ch, some(err_data)); + core::comm::send(setup_ch, some(err_data)); } } } _ => { log(debug, ~"failure to uv_tcp_init"); let err_data = uv::ll::get_last_err_data(loop_ptr); - comm::send(setup_ch, some(err_data)); + core::comm::send(setup_ch, some(err_data)); } } }; @@ -684,7 +686,7 @@ fn listen_common(-host_ip: ip::ip_addr, port: uint, backlog: uint, } none => { on_establish_cb(kill_ch); - let kill_result = comm::recv(kill_po); + let kill_result = core::comm::recv(kill_po); do iotask::interact(iotask) |loop_ptr| { log(debug, fmt!{"tcp::listen post-kill recv hl interact %?", loop_ptr}); @@ -835,8 +837,8 @@ impl @tcp_socket_buf: io::Writer { // INTERNAL API fn tear_down_socket_data(socket_data: @tcp_socket_data) unsafe { - let closed_po = comm::port::<()>(); - let closed_ch = comm::chan(closed_po); + let closed_po = core::comm::port::<()>(); + let closed_ch = core::comm::chan(closed_po); let close_data = { closed_ch: closed_ch }; @@ -849,7 +851,7 @@ fn tear_down_socket_data(socket_data: @tcp_socket_data) unsafe { close_data_ptr); uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb); }; - comm::recv(closed_po); + core::comm::recv(closed_po); log(debug, fmt!{"about to free socket_data at %?", socket_data}); rustrt::rust_uv_current_kernel_free(stream_handle_ptr as *libc::c_void); @@ -872,7 +874,7 @@ fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint) timer::recv_timeout( iotask, timeout_msecs, result::get(rs_result)) } else { - some(comm::recv(result::get(rs_result))) + some(core::comm::recv(result::get(rs_result))) }; log(debug, ~"tcp::read after recv_timeout"); match read_result { @@ -898,23 +900,23 @@ fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint) fn read_stop_common_impl(socket_data: *tcp_socket_data) -> result::result<(), tcp_err_data> unsafe { let stream_handle_ptr = (*socket_data).stream_handle_ptr; - let stop_po = comm::port::>(); - let stop_ch = comm::chan(stop_po); + let stop_po = core::comm::port::>(); + let stop_ch = core::comm::chan(stop_po); do iotask::interact((*socket_data).iotask) |loop_ptr| { log(debug, ~"in interact cb for tcp::read_stop"); match uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) { 0i32 => { log(debug, ~"successfully called uv_read_stop"); - comm::send(stop_ch, none); + core::comm::send(stop_ch, none); } _ => { log(debug, ~"failure in calling uv_read_stop"); let err_data = uv::ll::get_last_err_data(loop_ptr); - comm::send(stop_ch, some(err_data.to_tcp_err())); + core::comm::send(stop_ch, some(err_data.to_tcp_err())); } } }; - match comm::recv(stop_po) { + match core::comm::recv(stop_po) { some(err_data) => result::err(err_data.to_tcp_err()), none => result::ok(()) } @@ -925,8 +927,8 @@ fn read_start_common_impl(socket_data: *tcp_socket_data) -> result::result>, tcp_err_data> unsafe { let stream_handle_ptr = (*socket_data).stream_handle_ptr; - let start_po = comm::port::>(); - let start_ch = comm::chan(start_po); + let start_po = core::comm::port::>(); + let start_ch = core::comm::chan(start_po); log(debug, ~"in tcp::read_start before interact loop"); do iotask::interact((*socket_data).iotask) |loop_ptr| { log(debug, fmt!{"in tcp::read_start interact cb %?", loop_ptr}); @@ -935,16 +937,16 @@ fn read_start_common_impl(socket_data: *tcp_socket_data) on_tcp_read_cb) { 0i32 => { log(debug, ~"success doing uv_read_start"); - comm::send(start_ch, none); + core::comm::send(start_ch, none); } _ => { log(debug, ~"error attempting uv_read_start"); let err_data = uv::ll::get_last_err_data(loop_ptr); - comm::send(start_ch, some(err_data)); + core::comm::send(start_ch, some(err_data)); } } }; - match comm::recv(start_po) { + match core::comm::recv(start_po) { some(err_data) => result::err(err_data.to_tcp_err()), none => result::ok((*socket_data).reader_po) } @@ -963,9 +965,9 @@ fn write_common_impl(socket_data_ptr: *tcp_socket_data, vec::unsafe::to_ptr(raw_write_data), vec::len(raw_write_data)) ]; let write_buf_vec_ptr = ptr::addr_of(write_buf_vec); - let result_po = comm::port::(); + let result_po = core::comm::port::(); let write_data = { - result_ch: comm::chan(result_po) + result_ch: core::comm::chan(result_po) }; let write_data_ptr = ptr::addr_of(write_data); do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| { @@ -981,7 +983,7 @@ fn write_common_impl(socket_data_ptr: *tcp_socket_data, _ => { log(debug, ~"error invoking uv_write()"); let err_data = uv::ll::get_last_err_data(loop_ptr); - comm::send((*write_data_ptr).result_ch, + core::comm::send((*write_data_ptr).result_ch, tcp_write_error(err_data.to_tcp_err())); } } @@ -990,7 +992,7 @@ fn write_common_impl(socket_data_ptr: *tcp_socket_data, // and waiting here for the write to complete, we should transfer // ownership of everything to the I/O task and let it deal with the // aftermath, so we don't have to sit here blocking. - match comm::recv(result_po) { + match core::comm::recv(result_po) { tcp_write_success => result::ok(()), tcp_write_error(err_data) => result::err(err_data.to_tcp_err()) } @@ -1012,7 +1014,7 @@ type tcp_listen_fc_data = { extern fn tcp_lfc_close_cb(handle: *uv::ll::uv_tcp_t) unsafe { let server_data_ptr = uv::ll::get_data_for_uv_handle( handle) as *tcp_listen_fc_data; - comm::send((*server_data_ptr).stream_closed_ch, ()); + core::comm::send((*server_data_ptr).stream_closed_ch, ()); } extern fn tcp_lfc_on_connection_cb(handle: *uv::ll::uv_tcp_t, @@ -1025,7 +1027,7 @@ extern fn tcp_lfc_on_connection_cb(handle: *uv::ll::uv_tcp_t, 0i32 => (*server_data_ptr).on_connect_cb(handle), _ => { let loop_ptr = uv::ll::get_loop_for_uv_handle(handle); - comm::send(kill_ch, + core::comm::send(kill_ch, some(uv::ll::get_last_err_data(loop_ptr) .to_tcp_err())); (*server_data_ptr).active = false; @@ -1085,7 +1087,7 @@ extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t, log(debug, fmt!{"on_tcp_read_cb: incoming err.. name %? msg %?", err_data.err_name, err_data.err_msg}); let reader_ch = (*socket_data_ptr).reader_ch; - comm::send(reader_ch, result::err(err_data)); + core::comm::send(reader_ch, result::err(err_data)); } // do nothing .. unneeded buf 0 => (), @@ -1096,7 +1098,7 @@ extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t, let reader_ch = (*socket_data_ptr).reader_ch; let buf_base = uv::ll::get_base_from_buf(buf); let new_bytes = vec::unsafe::from_buf(buf_base, nread as uint); - comm::send(reader_ch, result::ok(new_bytes)); + core::comm::send(reader_ch, result::ok(new_bytes)); } } uv::ll::free_base_of_buf(buf); @@ -1123,7 +1125,7 @@ extern fn tcp_socket_dtor_close_cb(handle: *uv::ll::uv_tcp_t) unsafe { let data = uv::ll::get_data_for_uv_handle(handle) as *tcp_socket_close_data; let closed_ch = (*data).closed_ch; - comm::send(closed_ch, ()); + core::comm::send(closed_ch, ()); log(debug, ~"tcp_socket_dtor_close_cb exiting.."); } @@ -1133,14 +1135,15 @@ extern fn tcp_write_complete_cb(write_req: *uv::ll::uv_write_t, as *write_req_data; if status == 0i32 { log(debug, ~"successful write complete"); - comm::send((*write_data_ptr).result_ch, tcp_write_success); + core::comm::send((*write_data_ptr).result_ch, tcp_write_success); } else { let stream_handle_ptr = uv::ll::get_stream_handle_from_write_req( write_req); let loop_ptr = uv::ll::get_loop_for_uv_handle(stream_handle_ptr); let err_data = uv::ll::get_last_err_data(loop_ptr); log(debug, ~"failure to write"); - comm::send((*write_data_ptr).result_ch, tcp_write_error(err_data)); + core::comm::send((*write_data_ptr).result_ch, + tcp_write_error(err_data)); } } @@ -1156,7 +1159,7 @@ type connect_req_data = { extern fn stream_error_close_cb(handle: *uv::ll::uv_tcp_t) unsafe { let data = uv::ll::get_data_for_uv_handle(handle) as *connect_req_data; - comm::send((*data).closed_signal_ch, ()); + core::comm::send((*data).closed_signal_ch, ()); log(debug, fmt!{"exiting steam_error_close_cb for %?", handle}); } @@ -1175,7 +1178,7 @@ extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t, match status { 0i32 => { log(debug, ~"successful tcp connection!"); - comm::send(result_ch, conn_success); + core::comm::send(result_ch, conn_success); } _ => { log(debug, ~"error in tcp_connect_on_connect_cb"); @@ -1183,7 +1186,7 @@ extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t, let err_data = uv::ll::get_last_err_data(loop_ptr); log(debug, fmt!{"err_data %? %?", err_data.err_name, err_data.err_msg}); - comm::send(result_ch, conn_failure(err_data)); + core::comm::send(result_ch, conn_failure(err_data)); uv::ll::set_data_for_uv_handle(tcp_stream_ptr, conn_data_ptr); uv::ll::close(tcp_stream_ptr, stream_error_close_cb); @@ -1279,11 +1282,11 @@ mod test { let expected_req = ~"ping"; let expected_resp = ~"pong"; - let server_result_po = comm::port::<~str>(); - let server_result_ch = comm::chan(server_result_po); + let server_result_po = core::comm::port::<~str>(); + let server_result_ch = core::comm::chan(server_result_po); - let cont_po = comm::port::<()>(); - let cont_ch = comm::chan(cont_po); + let cont_po = core::comm::port::<()>(); + let cont_ch = core::comm::chan(cont_po); // server do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do comm::listen |server_ch| { @@ -1297,10 +1300,10 @@ mod test { }; server_result_ch.send(actual_req); }; - comm::recv(cont_po); + core::comm::recv(cont_po); // client log(debug, ~"server started, firing up client.."); - let actual_resp_result = do comm::listen |client_ch| { + let actual_resp_result = do core::comm::listen |client_ch| { run_tcp_test_client( server_ip, server_port, @@ -1310,7 +1313,7 @@ mod test { }; assert actual_resp_result.is_ok(); let actual_resp = actual_resp_result.get(); - let actual_req = comm::recv(server_result_po); + let actual_req = core::comm::recv(server_result_po); log(debug, fmt!{"REQ: expected: '%s' actual: '%s'", expected_req, actual_req}); log(debug, fmt!{"RESP: expected: '%s' actual: '%s'", @@ -1325,7 +1328,7 @@ mod test { let expected_req = ~"ping"; // client log(debug, ~"firing up client.."); - let actual_resp_result = do comm::listen |client_ch| { + let actual_resp_result = do core::comm::listen |client_ch| { run_tcp_test_client( server_ip, server_port, @@ -1345,11 +1348,11 @@ mod test { let expected_req = ~"ping"; let expected_resp = ~"pong"; - let server_result_po = comm::port::<~str>(); - let server_result_ch = comm::chan(server_result_po); + let server_result_po = core::comm::port::<~str>(); + let server_result_ch = core::comm::chan(server_result_po); - let cont_po = comm::port::<()>(); - let cont_ch = comm::chan(cont_po); + let cont_po = core::comm::port::<()>(); + let cont_ch = core::comm::chan(cont_po); // server do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do comm::listen |server_ch| { @@ -1363,7 +1366,7 @@ mod test { }; server_result_ch.send(actual_req); }; - comm::recv(cont_po); + core::comm::recv(cont_po); // this one should fail.. let listen_err = run_tcp_test_server_fail( server_ip, @@ -1371,7 +1374,7 @@ mod test { hl_loop); // client.. just doing this so that the first server tears down log(debug, ~"server started, firing up client.."); - do comm::listen |client_ch| { + do core::comm::listen |client_ch| { run_tcp_test_client( server_ip, server_port, @@ -1415,11 +1418,11 @@ mod test { let expected_req = ~"ping"; let expected_resp = ~"pong"; - let server_result_po = comm::port::<~str>(); - let server_result_ch = comm::chan(server_result_po); + let server_result_po = core::comm::port::<~str>(); + let server_result_ch = core::comm::chan(server_result_po); - let cont_po = comm::port::<()>(); - let cont_ch = comm::chan(cont_po); + let cont_po = core::comm::port::<()>(); + let cont_ch = core::comm::chan(cont_po); // server do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do comm::listen |server_ch| { @@ -1433,7 +1436,7 @@ mod test { }; server_result_ch.send(actual_req); }; - comm::recv(cont_po); + core::comm::recv(cont_po); // client let server_addr = ip::v4::parse_addr(server_ip); let conn_result = connect(server_addr, server_port, iotask); @@ -1449,7 +1452,7 @@ mod test { vec::len(resp_buf)) }; - let actual_req = comm::recv(server_result_po); + let actual_req = core::comm::recv(server_result_po); log(debug, fmt!{"REQ: expected: '%s' actual: '%s'", expected_req, actual_req}); log(debug, fmt!{"RESP: expected: '%s' actual: '%s'", @@ -1484,7 +1487,7 @@ mod test { |kill_ch| { log(debug, fmt!{"establish_cb %?", kill_ch}); - comm::send(cont_ch, ()); + core::comm::send(cont_ch, ()); }, // risky to run this on the loop, but some users // will want the POWER @@ -1499,7 +1502,7 @@ mod test { if result::is_err(accept_result) { log(debug, ~"SERVER: error accept connection"); let err_data = result::get_err(accept_result); - comm::send(kill_ch, some(err_data)); + core::comm::send(kill_ch, some(err_data)); log(debug, ~"SERVER/WORKER: send on err cont ch"); cont_ch.send(()); @@ -1522,12 +1525,12 @@ mod test { log(debug, ~"SERVER: before write"); tcp_write_single(sock, str::bytes(resp)); log(debug, ~"SERVER: after write.. die"); - comm::send(kill_ch, none); + core::comm::send(kill_ch, none); } result::err(err_data) => { log(debug, fmt!{"SERVER: error recvd: %s %s", err_data.err_name, err_data.err_msg}); - comm::send(kill_ch, some(err_data)); + core::comm::send(kill_ch, some(err_data)); server_ch.send(~""); } } diff --git a/src/libstd/par.rs b/src/libstd/par.rs index 60996c1f773..7dbb07a6aad 100644 --- a/src/libstd/par.rs +++ b/src/libstd/par.rs @@ -1,7 +1,3 @@ -import comm::port; -import comm::chan; -import comm::send; -import comm::recv; import future_spawn = future::spawn; export map, mapi, alli, any, mapi_factory; diff --git a/src/libstd/std.rc b/src/libstd/std.rc index b2d9d3770fc..91a4ff23c19 100644 --- a/src/libstd/std.rc +++ b/src/libstd/std.rc @@ -52,6 +52,7 @@ mod cell; mod sync; mod arc; +mod comm; // Collections diff --git a/src/libstd/test.rs b/src/libstd/test.rs index b61ad7e5a61..09bde8d6b3f 100644 --- a/src/libstd/test.rs +++ b/src/libstd/test.rs @@ -10,6 +10,7 @@ import result::{ok, err}; import io::WriterUtil; import libc::size_t; import task::TaskBuilder; +import comm = core::comm; export test_name; export test_fn; @@ -285,8 +286,8 @@ fn run_tests(opts: test_opts, tests: ~[test_desc], let mut wait_idx = 0u; let mut done_idx = 0u; - let p = comm::port(); - let ch = comm::chan(p); + let p = core::comm::port(); + let ch = core::comm::chan(p); while done_idx < total { while wait_idx < concurrency && run_idx < total { @@ -302,7 +303,7 @@ fn run_tests(opts: test_opts, tests: ~[test_desc], run_idx += 1u; } - let (test, result) = comm::recv(p); + let (test, result) = core::comm::recv(p); if concurrency != 1u { callback(te_wait(copy test)); } @@ -381,7 +382,7 @@ type test_future = {test: test_desc, wait: fn@() -> test_result}; fn run_test(+test: test_desc, monitor_ch: comm::Chan) { if test.ignore { - comm::send(monitor_ch, (copy test, tr_ignored)); + core::comm::send(monitor_ch, (copy test, tr_ignored)); return; } @@ -419,10 +420,10 @@ mod tests { ignore: true, should_fail: false }; - let p = comm::port(); - let ch = comm::chan(p); + let p = core::comm::port(); + let ch = core::comm::chan(p); run_test(desc, ch); - let (_, res) = comm::recv(p); + let (_, res) = core::comm::recv(p); assert res != tr_ok; } @@ -435,10 +436,10 @@ mod tests { ignore: true, should_fail: false }; - let p = comm::port(); - let ch = comm::chan(p); + let p = core::comm::port(); + let ch = core::comm::chan(p); run_test(desc, ch); - let (_, res) = comm::recv(p); + let (_, res) = core::comm::recv(p); assert res == tr_ignored; } @@ -452,10 +453,10 @@ mod tests { ignore: false, should_fail: true }; - let p = comm::port(); - let ch = comm::chan(p); + let p = core::comm::port(); + let ch = core::comm::chan(p); run_test(desc, ch); - let (_, res) = comm::recv(p); + let (_, res) = core::comm::recv(p); assert res == tr_ok; } @@ -468,10 +469,10 @@ mod tests { ignore: false, should_fail: true }; - let p = comm::port(); - let ch = comm::chan(p); + let p = core::comm::port(); + let ch = core::comm::chan(p); run_test(desc, ch); - let (_, res) = comm::recv(p); + let (_, res) = core::comm::recv(p); assert res == tr_failed; } diff --git a/src/libstd/timer.rs b/src/libstd/timer.rs index 468a846163b..6be8ff05bba 100644 --- a/src/libstd/timer.rs +++ b/src/libstd/timer.rs @@ -3,6 +3,8 @@ import uv = uv; import uv::iotask; import iotask::iotask; +import comm = core::comm; + export delayed_send, sleep, recv_timeout; /** @@ -24,8 +26,8 @@ export delayed_send, sleep, recv_timeout; fn delayed_send(iotask: iotask, msecs: uint, ch: comm::Chan, val: T) { unsafe { - let timer_done_po = comm::port::<()>(); - let timer_done_ch = comm::chan(timer_done_po); + let timer_done_po = core::comm::port::<()>(); + let timer_done_ch = core::comm::chan(timer_done_po); let timer_done_ch_ptr = ptr::addr_of(timer_done_ch); let timer = uv::ll::timer_t(); let timer_ptr = ptr::addr_of(timer); @@ -51,11 +53,11 @@ fn delayed_send(iotask: iotask, } }; // delayed_send_cb has been processed by libuv - comm::recv(timer_done_po); + core::comm::recv(timer_done_po); // notify the caller immediately - comm::send(ch, copy(val)); + core::comm::send(ch, copy(val)); // uv_close for this timer has been processed - comm::recv(timer_done_po); + core::comm::recv(timer_done_po); }; } @@ -71,10 +73,10 @@ fn delayed_send(iotask: iotask, * * msecs - an amount of time, in milliseconds, for the current task to block */ fn sleep(iotask: iotask, msecs: uint) { - let exit_po = comm::port::<()>(); - let exit_ch = comm::chan(exit_po); + let exit_po = core::comm::port::<()>(); + let exit_ch = core::comm::chan(exit_po); delayed_send(iotask, msecs, exit_ch, ()); - comm::recv(exit_po); + core::comm::recv(exit_po); } /** @@ -89,7 +91,7 @@ fn sleep(iotask: iotask, msecs: uint) { * * * `iotask' - `uv::iotask` that the tcp request will run on * * msecs - an mount of time, in milliseconds, to wait to receive - * * wait_port - a `comm::port` to receive on + * * wait_port - a `core::comm::port` to receive on * * # Returns * @@ -111,7 +113,7 @@ fn recv_timeout(iotask: iotask, none }, |right_val| { some(*right_val) - }, &comm::select2(timeout_po, wait_po) + }, &core::comm::select2(timeout_po, wait_po) ) } @@ -123,7 +125,7 @@ extern fn delayed_send_cb(handle: *uv::ll::uv_timer_t, *(uv::ll::get_data_for_uv_handle(handle) as *comm::Chan<()>); let stop_result = uv::ll::timer_stop(handle); if (stop_result == 0i32) { - comm::send(timer_done_ch, ()); + core::comm::send(timer_done_ch, ()); uv::ll::close(handle, delayed_send_close_cb); } else { @@ -158,8 +160,8 @@ mod test { #[test] fn test_gl_timer_sleep_stress2() { - let po = comm::port(); - let ch = comm::chan(po); + let po = core::comm::port(); + let ch = core::comm::chan(po); let hl_loop = uv::global_loop::get(); let repeat = 20u; @@ -181,13 +183,13 @@ mod test { for iter::repeat(times) { sleep(hl_loop, rng.next() as uint % maxms); } - comm::send(ch, ()); + core::comm::send(ch, ()); } } } for iter::repeat(repeat * spec.len()) { - comm::recv(po) + core::comm::recv(po) } } @@ -208,8 +210,8 @@ mod test { task::yield(); let expected = rand::rng().gen_str(16u); - let test_po = comm::port::(); - let test_ch = comm::chan(test_po); + let test_po = core::comm::port::(); + let test_ch = core::comm::chan(test_po); do task::spawn() { delayed_send(hl_loop, 1u, test_ch, expected); @@ -236,8 +238,8 @@ mod test { for iter::repeat(times as uint) { let expected = rand::rng().gen_str(16u); - let test_po = comm::port::<~str>(); - let test_ch = comm::chan(test_po); + let test_po = core::comm::port::<~str>(); + let test_ch = core::comm::chan(test_po); do task::spawn() { delayed_send(hl_loop, 1000u, test_ch, expected); diff --git a/src/libstd/uv_global_loop.rs b/src/libstd/uv_global_loop.rs index b28b0033a01..669b4f16e1b 100644 --- a/src/libstd/uv_global_loop.rs +++ b/src/libstd/uv_global_loop.rs @@ -7,6 +7,7 @@ import iotask = uv_iotask; import get_gl = get; import iotask::{iotask, spawn_iotask}; import priv::{chan_from_global_ptr, weaken_task}; +import comm = core::comm; import comm::{Port, Chan, port, chan, select2, listen}; import task::TaskBuilder; import either::{Left, Right}; @@ -110,7 +111,7 @@ mod test { let exit_ch_ptr = ll::get_data_for_uv_handle( timer_ptr as *libc::c_void) as *comm::Chan; let exit_ch = *exit_ch_ptr; - comm::send(exit_ch, true); + core::comm::send(exit_ch, true); log(debug, fmt!{"EXIT_CH_PTR simple_timer_close_cb exit_ch_ptr: %?", exit_ch_ptr}); } @@ -129,8 +130,8 @@ mod test { } fn impl_uv_hl_simple_timer(iotask: iotask) unsafe { - let exit_po = comm::port::(); - let exit_ch = comm::chan(exit_po); + let exit_po = core::comm::port::(); + let exit_ch = core::comm::chan(exit_po); let exit_ch_ptr = ptr::addr_of(exit_ch); log(debug, fmt!{"EXIT_CH_PTR newly created exit_ch_ptr: %?", exit_ch_ptr}); @@ -155,7 +156,7 @@ mod test { fail ~"failure on ll::timer_init()"; } }; - comm::recv(exit_po); + core::comm::recv(exit_po); log(debug, ~"global_loop timer test: msg recv on exit_po, done.."); } @@ -166,10 +167,10 @@ mod test { let exit_ch = comm::chan(exit_po); task::spawn_sched(task::ManualThreads(1u), || { impl_uv_hl_simple_timer(hl_loop); - comm::send(exit_ch, ()); + core::comm::send(exit_ch, ()); }); impl_uv_hl_simple_timer(hl_loop); - comm::recv(exit_po); + core::comm::recv(exit_po); } // keeping this test ignored until some kind of stress-test-harness @@ -178,17 +179,17 @@ mod test { #[ignore] fn test_stress_gl_uv_global_loop_high_level_global_timer() unsafe { let hl_loop = get_gl(); - let exit_po = comm::port::<()>(); - let exit_ch = comm::chan(exit_po); + let exit_po = core::comm::port::<()>(); + let exit_ch = core::comm::chan(exit_po); let cycles = 5000u; for iter::repeat(cycles) { task::spawn_sched(task::ManualThreads(1u), || { impl_uv_hl_simple_timer(hl_loop); - comm::send(exit_ch, ()); + core::comm::send(exit_ch, ()); }); }; for iter::repeat(cycles) { - comm::recv(exit_po); + core::comm::recv(exit_po); }; log(debug, ~"test_stress_gl_uv_global_loop_high_level_global_timer"+ ~" exiting sucessfully!"); diff --git a/src/libstd/uv_iotask.rs b/src/libstd/uv_iotask.rs index dd7063e4c9b..f6ccbeb5d0d 100644 --- a/src/libstd/uv_iotask.rs +++ b/src/libstd/uv_iotask.rs @@ -12,6 +12,7 @@ export exit; import libc::c_void; import ptr::addr_of; +import comm = core::comm; import comm::{Port, port, Chan, chan, listen}; import task::TaskBuilder; import ll = uv_ll; @@ -171,7 +172,7 @@ mod test { log(debug, fmt!{"async_close_cb handle %?", handle}); let exit_ch = (*(ll::get_data_for_uv_handle(handle) as *ah_data)).exit_ch; - comm::send(exit_ch, ()); + core::comm::send(exit_ch, ()); } extern fn async_handle_cb(handle: *ll::uv_async_t, status: libc::c_int) unsafe { @@ -185,8 +186,8 @@ mod test { fn impl_uv_iotask_async(iotask: iotask) unsafe { let async_handle = ll::async_t(); let ah_ptr = ptr::addr_of(async_handle); - let exit_po = comm::port::<()>(); - let exit_ch = comm::chan(exit_po); + let exit_po = core::comm::port::<()>(); + let exit_ch = core::comm::chan(exit_po); let ah_data = { iotask: iotask, exit_ch: exit_ch @@ -197,7 +198,7 @@ mod test { ll::set_data_for_uv_handle(ah_ptr, ah_data_ptr as *libc::c_void); ll::async_send(ah_ptr); }; - comm::recv(exit_po); + core::comm::recv(exit_po); } // this fn documents the bear minimum neccesary to roll your own @@ -209,7 +210,7 @@ mod test { run_loop(iotask_ch); exit_ch.send(()); }; - return comm::recv(iotask_port); + return core::comm::recv(iotask_port); } extern fn lifetime_handle_close(handle: *libc::c_void) unsafe { @@ -224,8 +225,8 @@ mod test { #[test] fn test_uv_iotask_async() unsafe { - let exit_po = comm::port::<()>(); - let exit_ch = comm::chan(exit_po); + let exit_po = core::comm::port::<()>(); + let exit_ch = core::comm::chan(exit_po); let iotask = spawn_test_loop(exit_ch); // using this handle to manage the lifetime of the high_level_loop, @@ -234,20 +235,20 @@ mod test { // under race-condition type situations.. this ensures that the loop // lives until, at least, all of the impl_uv_hl_async() runs have been // called, at least. - let work_exit_po = comm::port::<()>(); - let work_exit_ch = comm::chan(work_exit_po); + let work_exit_po = core::comm::port::<()>(); + let work_exit_ch = core::comm::chan(work_exit_po); for iter::repeat(7u) { do task::spawn_sched(task::ManualThreads(1u)) { impl_uv_iotask_async(iotask); - comm::send(work_exit_ch, ()); + core::comm::send(work_exit_ch, ()); }; }; for iter::repeat(7u) { - comm::recv(work_exit_po); + core::comm::recv(work_exit_po); }; log(debug, ~"sending teardown_loop msg.."); exit(iotask); - comm::recv(exit_po); + core::comm::recv(exit_po); log(debug, ~"after recv on exit_po.. exiting.."); } } diff --git a/src/libstd/uv_ll.rs b/src/libstd/uv_ll.rs index c4c9d7c4607..0737e61a7c1 100644 --- a/src/libstd/uv_ll.rs +++ b/src/libstd/uv_ll.rs @@ -22,6 +22,7 @@ import libc::size_t; import ptr::assimilate; +import comm = core::comm; // libuv struct mappings type uv_ip4_addr = { @@ -1046,7 +1047,7 @@ mod test { let bytes = vec::unsafe::from_buf(buf_base, buf_len as uint); let read_chan = *((*client_data).read_chan); let msg_from_server = str::from_bytes(bytes); - comm::send(read_chan, msg_from_server); + core::comm::send(read_chan, msg_from_server); close(stream as *libc::c_void, after_close_cb) } else if (nread == -1) { @@ -1231,7 +1232,7 @@ mod test { log(debug, ~"SERVER: sending response to client"); read_stop(client_stream_ptr); let server_chan = *((*client_data).server_chan); - comm::send(server_chan, request_str); + core::comm::send(server_chan, request_str); let write_result = write( write_req, client_stream_ptr as *libc::c_void, @@ -1346,7 +1347,7 @@ mod test { async_handle as *libc::c_void) as *async_handle_data; let continue_chan = *((*data).continue_chan); let should_continue = status == 0i32; - comm::send(continue_chan, should_continue); + core::comm::send(continue_chan, should_continue); close(async_handle as *libc::c_void, async_close_cb); } @@ -1460,13 +1461,13 @@ mod test { let port = 8887; let kill_server_msg = ~"does a dog have buddha nature?"; let server_resp_msg = ~"mu!"; - let client_port = comm::port::<~str>(); - let client_chan = comm::chan::<~str>(client_port); - let server_port = comm::port::<~str>(); - let server_chan = comm::chan::<~str>(server_port); + let client_port = core::comm::port::<~str>(); + let client_chan = core::comm::chan::<~str>(client_port); + let server_port = core::comm::port::<~str>(); + let server_chan = core::comm::chan::<~str>(server_port); - let continue_port = comm::port::(); - let continue_chan = comm::chan::(continue_port); + let continue_port = core::comm::port::(); + let continue_chan = core::comm::chan::(continue_port); let continue_chan_ptr = ptr::addr_of(continue_chan); do task::spawn_sched(task::ManualThreads(1u)) { @@ -1479,7 +1480,7 @@ mod test { // block until the server up is.. possibly a race? log(debug, ~"before receiving on server continue_port"); - comm::recv(continue_port); + core::comm::recv(continue_port); log(debug, ~"received on continue port, set up tcp client"); do task::spawn_sched(task::ManualThreads(1u)) { @@ -1488,8 +1489,8 @@ mod test { ptr::addr_of(client_chan)); }; - let msg_from_client = comm::recv(server_port); - let msg_from_server = comm::recv(client_port); + let msg_from_client = core::comm::recv(server_port); + let msg_from_server = core::comm::recv(client_port); assert str::contains(msg_from_client, kill_server_msg); assert str::contains(msg_from_server, server_resp_msg);