std: Add close_{read,write}() methods to I/O

Two new methods were added to TcpStream and UnixStream:

    fn close_read(&mut self) -> IoResult<()>;
    fn close_write(&mut self) -> IoResult<()>;

These two methods map to shutdown()'s behavior (the system call on unix),
closing the reading or writing half of a duplex stream. These methods are
primarily added to allow waking up a pending read in another task. By closing
the reading half of a connection, all pending readers will be woken up and will
return with EndOfFile. The close_write() method was added for symmetry with
close_read(), and I imagine that it will be quite useful at some point.

Implementation-wise, librustuv got the short end of the stick this time. The
native versions just delegate to the shutdown() syscall (easy). The uv versions
can leverage uv_shutdown() for tcp/unix streams, but only for closing the
writing half. Closing the reading half is done through some careful dancing to
wake up a pending reader.

As usual, windows likes to be different from unix. The windows implementation
uses shutdown() for sockets, but shutdown() is not available for named pipes.
Instead, CancelIoEx was used with same fancy synchronization to make sure
everyone knows what's up.

cc #11165
This commit is contained in:
Alex Crichton 2014-04-24 18:48:21 -07:00
parent ef6daf9935
commit ec9ade938e
14 changed files with 535 additions and 99 deletions

View File

@ -118,7 +118,7 @@ pub use consts::os::bsd44::{SOL_SOCKET, SO_KEEPALIVE, SO_ERROR};
pub use consts::os::bsd44::{SO_REUSEADDR, SO_BROADCAST, SHUT_WR, IP_MULTICAST_LOOP};
pub use consts::os::bsd44::{IP_ADD_MEMBERSHIP, IP_DROP_MEMBERSHIP};
pub use consts::os::bsd44::{IPV6_ADD_MEMBERSHIP, IPV6_DROP_MEMBERSHIP};
pub use consts::os::bsd44::{IP_MULTICAST_TTL, IP_TTL};
pub use consts::os::bsd44::{IP_MULTICAST_TTL, IP_TTL, SHUT_RD};
pub use funcs::c95::ctype::{isalnum, isalpha, iscntrl, isdigit};
pub use funcs::c95::ctype::{islower, isprint, ispunct, isspace};
@ -226,6 +226,8 @@ pub use funcs::bsd43::{shutdown};
#[cfg(windows)] pub use consts::os::extra::{FILE_WRITE_ATTRIBUTES, FILE_READ_ATTRIBUTES};
#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_BUSY, ERROR_IO_PENDING};
#[cfg(windows)] pub use consts::os::extra::{ERROR_PIPE_CONNECTED, WAIT_OBJECT_0};
#[cfg(windows)] pub use consts::os::extra::{ERROR_NOT_FOUND};
#[cfg(windows)] pub use consts::os::extra::{ERROR_OPERATION_ABORTED};
#[cfg(windows)] pub use types::os::common::bsd44::{SOCKET};
#[cfg(windows)] pub use types::os::common::posix01::{stat, utimbuf};
#[cfg(windows)] pub use types::os::arch::extra::{HANDLE, BOOL, LPSECURITY_ATTRIBUTES};
@ -1740,8 +1742,10 @@ pub mod consts {
pub static ERROR_NO_DATA: c_int = 232;
pub static ERROR_INVALID_ADDRESS : c_int = 487;
pub static ERROR_PIPE_CONNECTED: c_int = 535;
pub static ERROR_OPERATION_ABORTED: c_int = 995;
pub static ERROR_IO_PENDING: c_int = 997;
pub static ERROR_FILE_INVALID : c_int = 1006;
pub static ERROR_NOT_FOUND: c_int = 1168;
pub static INVALID_HANDLE_VALUE : c_int = -1;
pub static DELETE : DWORD = 0x00010000;

View File

@ -61,4 +61,6 @@ extern "system" {
optlen: *mut libc::c_int) -> libc::c_int;
pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL;
pub fn CancelIoEx(hFile: libc::HANDLE,
lpOverlapped: libc::LPOVERLAPPED) -> libc::BOOL;
}

View File

@ -12,12 +12,12 @@
use libc::{c_int, c_void};
use libc;
use std::sync::arc::UnsafeArc;
use std::c_str::CString;
use std::io::IoError;
use std::io;
use std::mem;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;
use io::{IoResult, retry, keep_going};
@ -178,6 +178,17 @@ impl rtio::RtioPipe for FileDesc {
fn clone(&self) -> Box<rtio::RtioPipe:Send> {
box FileDesc { inner: self.inner.clone() } as Box<rtio::RtioPipe:Send>
}
// Only supported on named pipes currently. Note that this doesn't have an
// impact on the std::io primitives, this is never called via
// std::io::PipeStream. If the functionality is exposed in the future, then
// these methods will need to be implemented.
fn close_read(&mut self) -> Result<(), IoError> {
Err(io::standard_error(io::InvalidInput))
}
fn close_write(&mut self) -> Result<(), IoError> {
Err(io::standard_error(io::InvalidInput))
}
}
impl rtio::RtioTTY for FileDesc {

View File

@ -210,6 +210,17 @@ impl rtio::RtioPipe for FileDesc {
fn clone(&self) -> Box<rtio::RtioPipe:Send> {
box FileDesc { inner: self.inner.clone() } as Box<rtio::RtioPipe:Send>
}
// Only supported on named pipes currently. Note that this doesn't have an
// impact on the std::io primitives, this is never called via
// std::io::PipeStream. If the functionality is exposed in the future, then
// these methods will need to be implemented.
fn close_read(&mut self) -> IoResult<()> {
Err(io::standard_error(io::InvalidInput))
}
fn close_write(&mut self) -> IoResult<()> {
Err(io::standard_error(io::InvalidInput))
}
}
impl rtio::RtioTTY for FileDesc {

View File

@ -357,9 +357,10 @@ impl rtio::RtioTcpStream for TcpStream {
} as Box<rtio::RtioTcpStream:Send>
}
fn close_write(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe {
libc::shutdown(self.fd(), libc::SHUT_WR)
})
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
}
fn close_read(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
}
}

View File

@ -149,6 +149,13 @@ impl rtio::RtioPipe for UnixStream {
inner: self.inner.clone(),
} as Box<rtio::RtioPipe:Send>
}
fn close_write(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_WR) })
}
fn close_read(&mut self) -> IoResult<()> {
super::mkerr_libc(unsafe { libc::shutdown(self.fd(), libc::SHUT_RD) })
}
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -84,13 +84,17 @@
//! the test suite passing (the suite is in libstd), and that's good enough for
//! me!
use std::c_str::CString;
use libc;
use std::c_str::CString;
use std::intrinsics;
use std::io;
use std::os::win32::as_utf16_p;
use std::os;
use std::ptr;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;
use std::intrinsics;
use std::sync::atomics;
use std::unstable::mutex;
use super::IoResult;
use super::c;
@ -124,6 +128,20 @@ impl Drop for Event {
struct Inner {
handle: libc::HANDLE,
lock: mutex::NativeMutex,
read_closed: atomics::AtomicBool,
write_closed: atomics::AtomicBool,
}
impl Inner {
fn new(handle: libc::HANDLE) -> Inner {
Inner {
handle: handle,
lock: unsafe { mutex::NativeMutex::new() },
read_closed: atomics::AtomicBool::new(false),
write_closed: atomics::AtomicBool::new(false),
}
}
}
impl Drop for Inner {
@ -218,7 +236,7 @@ impl UnixStream {
loop {
match UnixStream::try_connect(p) {
Some(handle) => {
let inner = Inner { handle: handle };
let inner = Inner::new(handle);
let mut mode = libc::PIPE_TYPE_BYTE |
libc::PIPE_READMODE_BYTE |
libc::PIPE_WAIT;
@ -275,6 +293,24 @@ impl UnixStream {
}
fn handle(&self) -> libc::HANDLE { unsafe { (*self.inner.get()).handle } }
fn read_closed(&self) -> bool {
unsafe { (*self.inner.get()).read_closed.load(atomics::SeqCst) }
}
fn write_closed(&self) -> bool {
unsafe { (*self.inner.get()).write_closed.load(atomics::SeqCst) }
}
fn cancel_io(&self) -> IoResult<()> {
match unsafe { c::CancelIoEx(self.handle(), ptr::mut_null()) } {
0 if os::errno() == libc::ERROR_NOT_FOUND as uint => {
Ok(())
}
0 => Err(super::last_error()),
_ => Ok(())
}
}
}
impl rtio::RtioPipe for UnixStream {
@ -287,6 +323,18 @@ impl rtio::RtioPipe for UnixStream {
let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() };
overlapped.hEvent = self.read.get_ref().handle();
// Pre-flight check to see if the reading half has been closed. This
// must be done before issuing the ReadFile request, but after we
// acquire the lock.
//
// See comments in close_read() about why this lock is necessary.
let guard = unsafe { (*self.inner.get()).lock.lock() };
if self.read_closed() {
return Err(io::standard_error(io::EndOfFile))
}
// Issue a nonblocking requests, succeeding quickly if it happened to
// succeed.
let ret = unsafe {
libc::ReadFile(self.handle(),
buf.as_ptr() as libc::LPVOID,
@ -294,24 +342,41 @@ impl rtio::RtioPipe for UnixStream {
&mut bytes_read,
&mut overlapped)
};
if ret == 0 {
let err = unsafe { libc::GetLastError() };
if err == libc::ERROR_IO_PENDING as libc::DWORD {
let ret = unsafe {
libc::GetOverlappedResult(self.handle(),
&mut overlapped,
&mut bytes_read,
libc::TRUE)
};
if ret == 0 {
return Err(super::last_error())
}
} else {
return Err(super::last_error())
}
if ret != 0 { return Ok(bytes_read as uint) }
// If our errno doesn't say that the I/O is pending, then we hit some
// legitimate error and reeturn immediately.
if os::errno() != libc::ERROR_IO_PENDING as uint {
return Err(super::last_error())
}
Ok(bytes_read as uint)
// Now that we've issued a successful nonblocking request, we need to
// wait for it to finish. This can all be done outside the lock because
// we'll see any invocation of CancelIoEx. We also call this in a loop
// because we're woken up if the writing half is closed, we just need to
// realize that the reading half wasn't closed and we go right back to
// sleep.
drop(guard);
loop {
let ret = unsafe {
libc::GetOverlappedResult(self.handle(),
&mut overlapped,
&mut bytes_read,
libc::TRUE)
};
// If we succeeded, or we failed for some reason other than
// CancelIoEx, return immediately
if ret != 0 { return Ok(bytes_read as uint) }
if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
return Err(super::last_error())
}
// If the reading half is now closed, then we're done. If we woke up
// because the writing half was closed, keep trying.
if self.read_closed() {
return Err(io::standard_error(io::EndOfFile))
}
}
}
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
@ -325,6 +390,17 @@ impl rtio::RtioPipe for UnixStream {
while offset < buf.len() {
let mut bytes_written = 0;
// This sequence below is quite similar to the one found in read().
// Some careful looping is done to ensure that if close_write() is
// invoked we bail out early, and if close_read() is invoked we keep
// going after we woke up.
//
// See comments in close_read() about why this lock is necessary.
let guard = unsafe { (*self.inner.get()).lock.lock() };
if self.write_closed() {
return Err(io::standard_error(io::BrokenPipe))
}
let ret = unsafe {
libc::WriteFile(self.handle(),
buf.slice_from(offset).as_ptr() as libc::LPVOID,
@ -332,20 +408,29 @@ impl rtio::RtioPipe for UnixStream {
&mut bytes_written,
&mut overlapped)
};
drop(guard);
if ret == 0 {
let err = unsafe { libc::GetLastError() };
if err == libc::ERROR_IO_PENDING as libc::DWORD {
let ret = unsafe {
libc::GetOverlappedResult(self.handle(),
&mut overlapped,
&mut bytes_written,
libc::TRUE)
};
if ret == 0 {
if os::errno() != libc::ERROR_IO_PENDING as uint {
return Err(super::last_error())
}
let ret = unsafe {
libc::GetOverlappedResult(self.handle(),
&mut overlapped,
&mut bytes_written,
libc::TRUE)
};
// If we weren't aborted, this was a legit error, if we were
// aborted, then check to see if the write half was actually
// closed or whether we woke up from the read half closing.
if ret == 0 {
if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
return Err(super::last_error())
}
} else {
return Err(super::last_error())
if self.write_closed() {
return Err(io::standard_error(io::BrokenPipe))
}
continue; // retry
}
}
offset += bytes_written as uint;
@ -360,6 +445,36 @@ impl rtio::RtioPipe for UnixStream {
write: None,
} as Box<rtio::RtioPipe:Send>
}
fn close_read(&mut self) -> IoResult<()> {
// On windows, there's no actual shutdown() method for pipes, so we're
// forced to emulate the behavior manually at the application level. To
// do this, we need to both cancel any pending requests, as well as
// prevent all future requests from succeeding. These two operations are
// not atomic with respect to one another, so we must use a lock to do
// so.
//
// The read() code looks like:
//
// 1. Make sure the pipe is still open
// 2. Submit a read request
// 3. Wait for the read request to finish
//
// The race this lock is preventing is if another thread invokes
// close_read() between steps 1 and 2. By atomically executing steps 1
// and 2 with a lock with respect to close_read(), we're guaranteed that
// no thread will erroneously sit in a read forever.
let _guard = unsafe { (*self.inner.get()).lock.lock() };
unsafe { (*self.inner.get()).read_closed.store(true, atomics::SeqCst) }
self.cancel_io()
}
fn close_write(&mut self) -> IoResult<()> {
// see comments in close_read() for why this lock is necessary
let _guard = unsafe { (*self.inner.get()).lock.lock() };
unsafe { (*self.inner.get()).write_closed.store(true, atomics::SeqCst) }
self.cancel_io()
}
}
////////////////////////////////////////////////////////////////////////////////
@ -520,7 +635,7 @@ impl UnixAcceptor {
// Transfer ownership of our handle into this stream
Ok(UnixStream {
inner: UnsafeArc::new(Inner { handle: handle }),
inner: UnsafeArc::new(Inner::new(handle)),
read: None,
write: None,
})

View File

@ -33,6 +33,7 @@ pub struct Guard<'a> {
struct Inner {
queue: Vec<BlockedTask>,
held: bool,
closed: bool,
}
impl Access {
@ -41,6 +42,7 @@ impl Access {
inner: UnsafeArc::new(Inner {
queue: vec![],
held: false,
closed: false,
})
}
}
@ -64,6 +66,15 @@ impl Access {
Guard { access: self, missile: Some(missile) }
}
pub fn close(&self, _missile: &HomingMissile) {
// This unsafety is OK because with a homing missile we're guaranteed to
// be the only task looking at the `closed` flag (and are therefore
// allowed to modify it). Additionally, no atomics are necessary because
// everyone's running on the same thread and has already done the
// necessary synchronization to be running on this thread.
unsafe { (*self.inner.get()).closed = true; }
}
}
impl Clone for Access {
@ -72,6 +83,14 @@ impl Clone for Access {
}
}
impl<'a> Guard<'a> {
pub fn is_closed(&self) -> bool {
// See above for why this unsafety is ok, it just applies to the read
// instead of the write.
unsafe { (*self.access.inner.get()).closed }
}
}
#[unsafe_destructor]
impl<'a> Drop for Guard<'a> {
fn drop(&mut self) {

View File

@ -11,6 +11,7 @@
use libc::{size_t, ssize_t, c_int, c_void, c_uint};
use libc;
use std::cast;
use std::io;
use std::io::{IoError, IoResult};
use std::io::net::ip;
use std::mem;
@ -411,7 +412,13 @@ impl rtio::RtioSocket for TcpWatcher {
impl rtio::RtioTcpStream for TcpWatcher {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
let m = self.fire_homing_missile();
let _g = self.read_access.grant(m);
let access = self.read_access.grant(m);
// see comments in close_read about this check
if access.is_closed() {
return Err(io::standard_error(io::EndOfFile))
}
self.stream.read(buf).map_err(uv_error_to_io_error)
}
@ -466,36 +473,17 @@ impl rtio::RtioTcpStream for TcpWatcher {
} as Box<rtio::RtioTcpStream:Send>
}
fn close_read(&mut self) -> Result<(), IoError> {
// see comments in PipeWatcher::close_read
let m = self.fire_homing_missile();
self.read_access.close(&m);
self.stream.cancel_read(m);
Ok(())
}
fn close_write(&mut self) -> Result<(), IoError> {
struct Ctx {
slot: Option<BlockedTask>,
status: c_int,
}
let mut req = Request::new(uvll::UV_SHUTDOWN);
return match unsafe {
uvll::uv_shutdown(req.handle, self.handle, shutdown_cb)
} {
0 => {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { slot: None, status: 0 };
wait_until_woken_after(&mut cx.slot, &self.uv_loop(), || {
req.set_data(&cx);
});
status_to_io_result(cx.status)
}
n => Err(uv_error_to_io_error(UvError(n)))
};
extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
let req = Request::wrap(req);
assert!(status != uvll::ECANCELED);
let cx: &mut Ctx = unsafe { req.get_data() };
cx.status = status;
wakeup(&mut cx.slot);
}
let _m = self.fire_homing_missile();
shutdown(self.handle, &self.uv_loop())
}
}
@ -704,7 +692,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
let m = self.fire_homing_missile();
let _g = self.read_access.grant(m);
let a = match unsafe {
return match unsafe {
uvll::uv_udp_recv_start(self.handle, alloc_cb, recv_cb)
} {
0 => {
@ -725,14 +713,12 @@ impl rtio::RtioUdpSocket for UdpWatcher {
}
n => Err(uv_error_to_io_error(UvError(n)))
};
return a;
extern fn alloc_cb(handle: *uvll::uv_udp_t,
_suggested_size: size_t,
buf: *mut Buf) {
unsafe {
let cx: &mut Ctx =
cast::transmute(uvll::get_data_for_uv_handle(handle));
let cx = &mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx);
*buf = cx.buf.take().expect("recv alloc_cb called more than once")
}
}
@ -740,8 +726,8 @@ impl rtio::RtioUdpSocket for UdpWatcher {
extern fn recv_cb(handle: *uvll::uv_udp_t, nread: ssize_t, buf: *Buf,
addr: *libc::sockaddr, _flags: c_uint) {
assert!(nread != uvll::ECANCELED as ssize_t);
let cx: &mut Ctx = unsafe {
cast::transmute(uvll::get_data_for_uv_handle(handle))
let cx = unsafe {
&mut *(uvll::get_data_for_uv_handle(handle) as *mut Ctx)
};
// When there's no data to read the recv callback can be a no-op.
@ -752,13 +738,7 @@ impl rtio::RtioUdpSocket for UdpWatcher {
return
}
unsafe {
assert_eq!(uvll::uv_udp_recv_stop(handle), 0)
}
let cx: &mut Ctx = unsafe {
cast::transmute(uvll::get_data_for_uv_handle(handle))
};
unsafe { assert_eq!(uvll::uv_udp_recv_stop(handle), 0) }
let addr = if addr == ptr::null() {
None
} else {
@ -900,6 +880,40 @@ impl Drop for UdpWatcher {
}
}
////////////////////////////////////////////////////////////////////////////////
// Shutdown helper
////////////////////////////////////////////////////////////////////////////////
pub fn shutdown(handle: *uvll::uv_stream_t, loop_: &Loop) -> Result<(), IoError> {
struct Ctx {
slot: Option<BlockedTask>,
status: c_int,
}
let mut req = Request::new(uvll::UV_SHUTDOWN);
return match unsafe { uvll::uv_shutdown(req.handle, handle, shutdown_cb) } {
0 => {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { slot: None, status: 0 };
wait_until_woken_after(&mut cx.slot, loop_, || {
req.set_data(&cx);
});
status_to_io_result(cx.status)
}
n => Err(uv_error_to_io_error(UvError(n)))
};
extern fn shutdown_cb(req: *uvll::uv_shutdown_t, status: libc::c_int) {
let req = Request::wrap(req);
assert!(status != uvll::ECANCELED);
let cx: &mut Ctx = unsafe { req.get_data() };
cx.status = status;
wakeup(&mut cx.slot);
}
}
#[cfg(test)]
mod test {
use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,

View File

@ -11,6 +11,7 @@
use libc;
use std::c_str::CString;
use std::io::IoError;
use std::io;
use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
use access::Access;
@ -111,7 +112,13 @@ impl PipeWatcher {
impl RtioPipe for PipeWatcher {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
let m = self.fire_homing_missile();
let _g = self.read_access.grant(m);
let access = self.read_access.grant(m);
// see comments in close_read about this check
if access.is_closed() {
return Err(io::standard_error(io::EndOfFile))
}
self.stream.read(buf).map_err(uv_error_to_io_error)
}
@ -131,6 +138,35 @@ impl RtioPipe for PipeWatcher {
write_access: self.write_access.clone(),
} as Box<RtioPipe:Send>
}
fn close_read(&mut self) -> Result<(), IoError> {
// The current uv_shutdown method only shuts the writing half of the
// connection, and no method is provided to shut down the reading half
// of the connection. With a lack of method, we emulate shutting down
// the reading half of the connection by manually returning early from
// all future calls to `read`.
//
// Note that we must be careful to ensure that *all* cloned handles see
// the closing of the read half, so we stored the "is closed" bit in the
// Access struct, not in our own personal watcher. Additionally, the
// homing missile is used as a locking mechanism to ensure there is no
// contention over this bit.
//
// To shutdown the read half, we must first flag the access as being
// closed, and then afterwards we cease any pending read. Note that this
// ordering is crucial because we could in theory be rescheduled during
// the uv_read_stop which means that another read invocation could leak
// in before we set the flag.
let m = self.fire_homing_missile();
self.read_access.close(&m);
self.stream.cancel_read(m);
Ok(())
}
fn close_write(&mut self) -> Result<(), IoError> {
let _m = self.fire_homing_missile();
net::shutdown(self.stream.handle, &self.uv_loop())
}
}
impl HomingIO for PipeWatcher {

View File

@ -14,6 +14,7 @@ use std::ptr;
use std::rt::task::BlockedTask;
use Loop;
use homing::HomingMissile;
use super::{UvError, Buf, slice_to_uv_buf, Request, wait_until_woken_after,
ForbidUnwind, wakeup};
use uvll;
@ -57,6 +58,7 @@ impl StreamWatcher {
// Wrappers should ensure to always reset the field to an appropriate value
// if they rely on the field to perform an action.
pub fn new(stream: *uvll::uv_stream_t) -> StreamWatcher {
unsafe { uvll::set_data_for_uv_handle(stream, 0 as *int) }
StreamWatcher {
handle: stream,
last_write_req: None,
@ -70,7 +72,9 @@ impl StreamWatcher {
let mut rcx = ReadContext {
buf: Some(slice_to_uv_buf(buf)),
result: 0,
// if the read is canceled, we'll see eof, otherwise this will get
// overwritten
result: uvll::EOF as ssize_t,
task: None,
};
// When reading a TTY stream on windows, libuv will invoke alloc_cb
@ -78,13 +82,11 @@ impl StreamWatcher {
// we must be ready for this to happen (by setting the data in the uv
// handle). In theory this otherwise doesn't need to happen until after
// the read is succesfully started.
unsafe {
uvll::set_data_for_uv_handle(self.handle, &rcx)
}
unsafe { uvll::set_data_for_uv_handle(self.handle, &rcx) }
// Send off the read request, but don't block until we're sure that the
// read request is queued.
match unsafe {
let ret = match unsafe {
uvll::uv_read_start(self.handle, alloc_cb, read_cb)
} {
0 => {
@ -96,6 +98,29 @@ impl StreamWatcher {
}
}
n => Err(UvError(n))
};
// Make sure a read cancellation sees that there's no pending read
unsafe { uvll::set_data_for_uv_handle(self.handle, 0 as *int) }
return ret;
}
pub fn cancel_read(&mut self, m: HomingMissile) {
// When we invoke uv_read_stop, it cancels the read and alloc
// callbacks. We need to manually wake up a pending task (if one was
// present). Note that we wake up the task *outside* the homing missile
// to ensure that we don't switch schedulers when we're not supposed to.
assert_eq!(unsafe { uvll::uv_read_stop(self.handle) }, 0);
let data = unsafe {
let data = uvll::get_data_for_uv_handle(self.handle);
if data.is_null() { return }
uvll::set_data_for_uv_handle(self.handle, 0 as *int);
&mut *(data as *mut ReadContext)
};
let task = data.task.take();
drop(m);
match task {
Some(task) => { let _ = task.wake().map(|t| t.reawaken()); }
None => {}
}
}

View File

@ -32,7 +32,7 @@ use rt::rtio::{RtioTcpAcceptor, RtioTcpStream};
///
/// # Example
///
/// ```rust
/// ```no_run
/// # #![allow(unused_must_use)]
/// use std::io::net::tcp::TcpStream;
/// use std::io::net::ip::{Ipv4Addr, SocketAddr};
@ -109,6 +109,48 @@ impl TcpStream {
None => self.obj.letdie(),
}
}
/// Closes the reading half of this connection.
///
/// This method will close the reading portion of this connection, causing
/// all pending and future reads to immediately return with an error.
///
/// # Example
///
/// ```no_run
/// # #![allow(unused_must_use)]
/// use std::io::timer;
/// use std::io::net::tcp::TcpStream;
/// use std::io::net::ip::{Ipv4Addr, SocketAddr};
///
/// let addr = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 34254 };
/// let mut stream = TcpStream::connect(addr).unwrap();
/// let stream2 = stream.clone();
///
/// spawn(proc() {
/// // close this stream after one second
/// timer::sleep(1000);
/// let mut stream = stream2;
/// stream.close_read();
/// });
///
/// // wait for some data, will get canceled after one second
/// let mut buf = [0];
/// stream.read(buf);
/// ```
///
/// Note that this method affects all cloned handles associated with this
/// stream, not just this one handle.
pub fn close_read(&mut self) -> IoResult<()> { self.obj.close_read() }
/// Closes the writing half of this connection.
///
/// This method will close the writing portion of this connection, causing
/// all future writes to immediately return with an error.
///
/// Note that this method affects all cloned handles associated with this
/// stream, not just this one handle.
pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() }
}
impl Clone for TcpStream {
@ -839,7 +881,11 @@ mod test {
// Also make sure that even though the timeout is expired that we will
// continue to receive any pending connections.
let l = TcpStream::connect(addr).unwrap();
let (tx, rx) = channel();
spawn(proc() {
tx.send(TcpStream::connect(addr).unwrap());
});
let l = rx.recv();
for i in range(0, 1001) {
match a.accept() {
Ok(..) => break,
@ -853,8 +899,69 @@ mod test {
// Unset the timeout and make sure that this always blocks.
a.set_timeout(None);
spawn(proc() {
drop(TcpStream::connect(addr));
drop(TcpStream::connect(addr).unwrap());
});
a.accept().unwrap();
})
iotest!(fn close_readwrite_smoke() {
let addr = next_test_ip4();
let a = TcpListener::bind(addr).listen().unwrap();
let (_tx, rx) = channel::<()>();
spawn(proc() {
let mut a = a;
let _s = a.accept().unwrap();
let _ = rx.recv_opt();
});
let mut b = [0];
let mut s = TcpStream::connect(addr).unwrap();
let mut s2 = s.clone();
// closing should prevent reads/writes
s.close_write().unwrap();
assert!(s.write([0]).is_err());
s.close_read().unwrap();
assert!(s.read(b).is_err());
// closing should affect previous handles
assert!(s2.write([0]).is_err());
assert!(s2.read(b).is_err());
// closing should affect new handles
let mut s3 = s.clone();
assert!(s3.write([0]).is_err());
assert!(s3.read(b).is_err());
// make sure these don't die
let _ = s2.close_read();
let _ = s2.close_write();
let _ = s3.close_read();
let _ = s3.close_write();
})
iotest!(fn close_read_wakes_up() {
let addr = next_test_ip4();
let a = TcpListener::bind(addr).listen().unwrap();
let (_tx, rx) = channel::<()>();
spawn(proc() {
let mut a = a;
let _s = a.accept().unwrap();
let _ = rx.recv_opt();
});
let mut s = TcpStream::connect(addr).unwrap();
let s2 = s.clone();
let (tx, rx) = channel();
spawn(proc() {
let mut s2 = s2;
assert!(s2.read([0]).is_err());
tx.send(());
});
// this should wake up the child task
s.close_read().unwrap();
// this test will never finish if the child doesn't wake up
rx.recv();
})
}

View File

@ -28,7 +28,6 @@ use prelude::*;
use c_str::ToCStr;
use clone::Clone;
use io::pipe::PipeStream;
use io::{Listener, Acceptor, Reader, Writer, IoResult};
use kinds::Send;
use owned::Box;
@ -37,14 +36,10 @@ use rt::rtio::{RtioUnixAcceptor, RtioPipe};
/// A stream which communicates over a named pipe.
pub struct UnixStream {
obj: PipeStream,
obj: Box<RtioPipe:Send>,
}
impl UnixStream {
fn new(obj: Box<RtioPipe:Send>) -> UnixStream {
UnixStream { obj: PipeStream::new(obj) }
}
/// Connect to a pipe named by `path`. This will attempt to open a
/// connection to the underlying socket.
///
@ -62,7 +57,7 @@ impl UnixStream {
/// ```
pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> {
LocalIo::maybe_raise(|io| {
io.unix_connect(&path.to_c_str(), None).map(UnixStream::new)
io.unix_connect(&path.to_c_str(), None).map(|p| UnixStream { obj: p })
})
}
@ -86,9 +81,28 @@ impl UnixStream {
timeout_ms: u64) -> IoResult<UnixStream> {
LocalIo::maybe_raise(|io| {
let s = io.unix_connect(&path.to_c_str(), Some(timeout_ms));
s.map(UnixStream::new)
s.map(|p| UnixStream { obj: p })
})
}
/// Closes the reading half of this connection.
///
/// This method will close the reading portion of this connection, causing
/// all pending and future reads to immediately return with an error.
///
/// Note that this method affects all cloned handles associated with this
/// stream, not just this one handle.
pub fn close_read(&mut self) -> IoResult<()> { self.obj.close_read() }
/// Closes the writing half of this connection.
///
/// This method will close the writing portion of this connection, causing
/// all pending and future writes to immediately return with an error.
///
/// Note that this method affects all cloned handles associated with this
/// stream, not just this one handle.
pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() }
}
impl Clone for UnixStream {
@ -174,7 +188,7 @@ impl UnixAcceptor {
impl Acceptor<UnixStream> for UnixAcceptor {
fn accept(&mut self) -> IoResult<UnixStream> {
self.obj.accept().map(UnixStream::new)
self.obj.accept().map(|s| UnixStream { obj: s })
}
}
@ -431,7 +445,12 @@ mod tests {
// Also make sure that even though the timeout is expired that we will
// continue to receive any pending connections.
let l = UnixStream::connect(&addr).unwrap();
let (tx, rx) = channel();
let addr2 = addr.clone();
spawn(proc() {
tx.send(UnixStream::connect(&addr2).unwrap());
});
let l = rx.recv();
for i in range(0, 1001) {
match a.accept() {
Ok(..) => break,
@ -446,7 +465,7 @@ mod tests {
a.set_timeout(None);
let addr2 = addr.clone();
spawn(proc() {
drop(UnixStream::connect(&addr2));
drop(UnixStream::connect(&addr2).unwrap());
});
a.accept().unwrap();
})
@ -461,4 +480,65 @@ mod tests {
let _a = UnixListener::bind(&addr).unwrap().listen().unwrap();
assert!(UnixStream::connect_timeout(&addr, 100).is_ok());
})
iotest!(fn close_readwrite_smoke() {
let addr = next_test_unix();
let a = UnixListener::bind(&addr).listen().unwrap();
let (_tx, rx) = channel::<()>();
spawn(proc() {
let mut a = a;
let _s = a.accept().unwrap();
let _ = rx.recv_opt();
});
let mut b = [0];
let mut s = UnixStream::connect(&addr).unwrap();
let mut s2 = s.clone();
// closing should prevent reads/writes
s.close_write().unwrap();
assert!(s.write([0]).is_err());
s.close_read().unwrap();
assert!(s.read(b).is_err());
// closing should affect previous handles
assert!(s2.write([0]).is_err());
assert!(s2.read(b).is_err());
// closing should affect new handles
let mut s3 = s.clone();
assert!(s3.write([0]).is_err());
assert!(s3.read(b).is_err());
// make sure these don't die
let _ = s2.close_read();
let _ = s2.close_write();
let _ = s3.close_read();
let _ = s3.close_write();
})
iotest!(fn close_read_wakes_up() {
let addr = next_test_unix();
let a = UnixListener::bind(&addr).listen().unwrap();
let (_tx, rx) = channel::<()>();
spawn(proc() {
let mut a = a;
let _s = a.accept().unwrap();
let _ = rx.recv_opt();
});
let mut s = UnixStream::connect(&addr).unwrap();
let s2 = s.clone();
let (tx, rx) = channel();
spawn(proc() {
let mut s2 = s2;
assert!(s2.read([0]).is_err());
tx.send(());
});
// this should wake up the child task
s.close_read().unwrap();
// this test will never finish if the child doesn't wake up
rx.recv();
})
}

View File

@ -221,6 +221,7 @@ pub trait RtioTcpStream : RtioSocket {
fn letdie(&mut self) -> IoResult<()>;
fn clone(&self) -> Box<RtioTcpStream:Send>;
fn close_write(&mut self) -> IoResult<()>;
fn close_read(&mut self) -> IoResult<()>;
}
pub trait RtioSocket {
@ -274,6 +275,9 @@ pub trait RtioPipe {
fn read(&mut self, buf: &mut [u8]) -> IoResult<uint>;
fn write(&mut self, buf: &[u8]) -> IoResult<()>;
fn clone(&self) -> Box<RtioPipe:Send>;
fn close_write(&mut self) -> IoResult<()>;
fn close_read(&mut self) -> IoResult<()>;
}
pub trait RtioUnixListener {