io: Bind to shutdown() for TCP streams
This is something that is plausibly useful, and is provided by libuv. This is not currently surfaced as part of the `TcpStream` type, but it may possibly appear in the future. For now only the raw functionality is provided through the Rtio objects.
This commit is contained in:
parent
3316a0e6b2
commit
a63deeb3d3
@ -351,6 +351,11 @@ impl rtio::RtioTcpStream for TcpStream {
|
||||
fn clone(&self) -> ~rtio::RtioTcpStream {
|
||||
~TcpStream { inner: self.inner.clone() } as ~rtio::RtioTcpStream
|
||||
}
|
||||
fn close_write(&mut self) -> IoResult<()> {
|
||||
super::mkerr_libc(unsafe {
|
||||
libc::shutdown(self.fd(), libc::SHUT_WR)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl rtio::RtioSocket for TcpStream {
|
||||
|
@ -305,6 +305,38 @@ impl rtio::RtioTcpStream for TcpWatcher {
|
||||
read_access: self.read_access.clone(),
|
||||
} as ~rtio::RtioTcpStream
|
||||
}
|
||||
|
||||
fn close_write(&mut self) -> Result<(), IoError> {
|
||||
struct Ctx {
|
||||
slot: Option<BlockedTask>,
|
||||
status: c_int,
|
||||
}
|
||||
let mut req = Request::new(uvll::UV_SHUTDOWN);
|
||||
|
||||
return match unsafe {
|
||||
uvll::uv_shutdown(req.handle, self.handle, shutdown_cb)
|
||||
} {
|
||||
0 => {
|
||||
req.defuse(); // uv callback now owns this request
|
||||
let mut cx = Ctx { slot: None, status: 0 };
|
||||
|
||||
wait_until_woken_after(&mut cx.slot, &self.uv_loop(), || {
|
||||
req.set_data(&cx);
|
||||
});
|
||||
|
||||
status_to_io_result(cx.status)
|
||||
}
|
||||
n => Err(uv_error_to_io_error(UvError(n)))
|
||||
};
|
||||
|
||||
extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
|
||||
let req = Request::wrap(req);
|
||||
assert!(status != uvll::ECANCELED);
|
||||
let cx: &mut Ctx = unsafe { req.get_data() };
|
||||
cx.status = status;
|
||||
wakeup(&mut cx.slot);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UvHandle<uvll::uv_tcp_t> for TcpWatcher {
|
||||
|
@ -157,6 +157,7 @@ pub type uv_process_t = c_void;
|
||||
pub type uv_pipe_t = c_void;
|
||||
pub type uv_tty_t = c_void;
|
||||
pub type uv_signal_t = c_void;
|
||||
pub type uv_shutdown_t = c_void;
|
||||
|
||||
pub struct uv_timespec_t {
|
||||
tv_sec: libc::c_long,
|
||||
@ -248,6 +249,7 @@ pub type uv_exit_cb = extern "C" fn(handle: *uv_process_t,
|
||||
pub type uv_signal_cb = extern "C" fn(handle: *uv_signal_t,
|
||||
signum: c_int);
|
||||
pub type uv_fs_cb = extern "C" fn(req: *uv_fs_t);
|
||||
pub type uv_shutdown_cb = extern "C" fn(req: *uv_shutdown_t, status: c_int);
|
||||
|
||||
#[cfg(unix)] pub type uv_uid_t = libc::types::os::arch::posix88::uid_t;
|
||||
#[cfg(unix)] pub type uv_gid_t = libc::types::os::arch::posix88::gid_t;
|
||||
@ -539,6 +541,8 @@ extern {
|
||||
on_alloc: uv_alloc_cb,
|
||||
on_read: uv_read_cb) -> c_int;
|
||||
pub fn uv_read_stop(stream: *uv_stream_t) -> c_int;
|
||||
pub fn uv_shutdown(req: *uv_shutdown_t, handle: *uv_stream_t,
|
||||
cb: uv_shutdown_cb) -> c_int;
|
||||
|
||||
// idle bindings
|
||||
pub fn uv_idle_init(l: *uv_loop_t, i: *uv_idle_t) -> c_int;
|
||||
|
@ -751,5 +751,23 @@ mod test {
|
||||
|
||||
p.recv();
|
||||
})
|
||||
|
||||
iotest!(fn shutdown_smoke() {
|
||||
use rt::rtio::RtioTcpStream;
|
||||
|
||||
let addr = next_test_ip4();
|
||||
let a = TcpListener::bind(addr).unwrap().listen();
|
||||
spawn(proc() {
|
||||
let mut a = a;
|
||||
let mut c = a.accept().unwrap();
|
||||
assert_eq!(c.read_to_end(), Ok(~[]));
|
||||
c.write([1]).unwrap();
|
||||
});
|
||||
|
||||
let mut s = TcpStream::connect(addr).unwrap();
|
||||
assert!(s.obj.close_write().is_ok());
|
||||
assert!(s.write([1]).is_err());
|
||||
assert_eq!(s.read_to_end(), Ok(~[1]));
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -1611,6 +1611,10 @@ pub mod consts {
|
||||
pub static SO_KEEPALIVE: c_int = 8;
|
||||
pub static SO_BROADCAST: c_int = 32;
|
||||
pub static SO_REUSEADDR: c_int = 4;
|
||||
|
||||
pub static SHUT_RD: c_int = 0;
|
||||
pub static SHUT_WR: c_int = 1;
|
||||
pub static SHUT_RDWR: c_int = 2;
|
||||
}
|
||||
pub mod extra {
|
||||
use libc::types::os::arch::c95::c_int;
|
||||
@ -2391,6 +2395,10 @@ pub mod consts {
|
||||
pub static SO_KEEPALIVE: c_int = 9;
|
||||
pub static SO_BROADCAST: c_int = 6;
|
||||
pub static SO_REUSEADDR: c_int = 2;
|
||||
|
||||
pub static SHUT_RD: c_int = 0;
|
||||
pub static SHUT_WR: c_int = 1;
|
||||
pub static SHUT_RDWR: c_int = 2;
|
||||
}
|
||||
#[cfg(target_arch = "x86")]
|
||||
#[cfg(target_arch = "x86_64")]
|
||||
@ -2842,6 +2850,10 @@ pub mod consts {
|
||||
pub static SO_KEEPALIVE: c_int = 0x0008;
|
||||
pub static SO_BROADCAST: c_int = 0x0020;
|
||||
pub static SO_REUSEADDR: c_int = 0x0004;
|
||||
|
||||
pub static SHUT_RD: c_int = 0;
|
||||
pub static SHUT_WR: c_int = 1;
|
||||
pub static SHUT_RDWR: c_int = 2;
|
||||
}
|
||||
pub mod extra {
|
||||
use libc::types::os::arch::c95::c_int;
|
||||
@ -3221,6 +3233,10 @@ pub mod consts {
|
||||
pub static SO_KEEPALIVE: c_int = 0x0008;
|
||||
pub static SO_BROADCAST: c_int = 0x0020;
|
||||
pub static SO_REUSEADDR: c_int = 0x0004;
|
||||
|
||||
pub static SHUT_RD: c_int = 0;
|
||||
pub static SHUT_WR: c_int = 1;
|
||||
pub static SHUT_RDWR: c_int = 2;
|
||||
}
|
||||
pub mod extra {
|
||||
use libc::types::os::arch::c95::c_int;
|
||||
@ -3939,6 +3955,7 @@ pub mod funcs {
|
||||
pub fn sendto(socket: c_int, buf: *c_void, len: size_t,
|
||||
flags: c_int, addr: *sockaddr,
|
||||
addrlen: socklen_t) -> ssize_t;
|
||||
pub fn shutdown(socket: c_int, how: c_int) -> c_int;
|
||||
}
|
||||
}
|
||||
|
||||
@ -3975,6 +3992,7 @@ pub mod funcs {
|
||||
pub fn sendto(socket: SOCKET, buf: *c_void, len: c_int,
|
||||
flags: c_int, addr: *sockaddr,
|
||||
addrlen: c_int) -> c_int;
|
||||
pub fn shutdown(socket: SOCKET, how: c_int) -> c_int;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -206,6 +206,7 @@ pub trait RtioTcpStream : RtioSocket {
|
||||
fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError>;
|
||||
fn letdie(&mut self) -> Result<(), IoError>;
|
||||
fn clone(&self) -> ~RtioTcpStream;
|
||||
fn close_write(&mut self) -> Result<(), IoError>;
|
||||
}
|
||||
|
||||
pub trait RtioSocket {
|
||||
|
Loading…
x
Reference in New Issue
Block a user