From 373878573569371be876bb838607c64076f17f49 Mon Sep 17 00:00:00 2001 From: The 8472 Date: Tue, 6 Jun 2023 01:09:05 +0200 Subject: [PATCH] Extend io::copy buffer reuse to BufReader too previously it was only able to use BufWriter. This was due to a limitation in the BufReader generics that prevented specialization. This change works around the issue by using `where Self: Read` instead of `where I: Read`. This limits our options, e.g. we can't access BufRead methods, but it happens to work out if we rely on some implementation details. --- library/std/src/io/buffered/bufreader.rs | 2 +- library/std/src/io/copy.rs | 110 ++++++++++++++++++++--- library/std/src/io/copy/tests.rs | 108 ++++++++++++++++++++++ library/std/src/io/util/tests.rs | 61 +------------ 4 files changed, 207 insertions(+), 74 deletions(-) create mode 100644 library/std/src/io/copy/tests.rs diff --git a/library/std/src/io/buffered/bufreader.rs b/library/std/src/io/buffered/bufreader.rs index 3edf9d747ce..a66e6ccf673 100644 --- a/library/std/src/io/buffered/bufreader.rs +++ b/library/std/src/io/buffered/bufreader.rs @@ -222,7 +222,7 @@ pub fn into_inner(self) -> R /// Invalidates all data in the internal buffer. #[inline] - fn discard_buffer(&mut self) { + pub(in crate::io) fn discard_buffer(&mut self) { self.buf.discard_buffer() } } diff --git a/library/std/src/io/copy.rs b/library/std/src/io/copy.rs index 420fc400705..ef1f4031ef2 100644 --- a/library/std/src/io/copy.rs +++ b/library/std/src/io/copy.rs @@ -1,6 +1,9 @@ -use super::{BorrowedBuf, BufWriter, ErrorKind, Read, Result, Write, DEFAULT_BUF_SIZE}; +use super::{BorrowedBuf, BufReader, BufWriter, ErrorKind, Read, Result, Write, DEFAULT_BUF_SIZE}; use crate::mem::MaybeUninit; +#[cfg(test)] +mod tests; + /// Copies the entire contents of a reader into a writer. /// /// This function will continuously read data from `reader` and then @@ -71,32 +74,113 @@ pub(crate) fn generic_copy(reader: &mut R, writer: &mut W) R: Read, W: Write, { - BufferedCopySpec::copy_to(reader, writer) + let read_buf = BufferedReaderSpec::buffer_size(reader); + let write_buf = BufferedWriterSpec::buffer_size(writer); + + if read_buf >= DEFAULT_BUF_SIZE && read_buf >= write_buf { + return BufferedReaderSpec::copy_to(reader, writer); + } + + BufferedWriterSpec::copy_from(writer, reader) +} + +/// Specialization of the read-write loop that reuses the internal +/// buffer of a BufReader. If there's no buffer then the writer side +/// should be used intead. +trait BufferedReaderSpec { + fn buffer_size(&self) -> usize; + + fn copy_to(&mut self, to: &mut (impl Write + ?Sized)) -> Result; +} + +impl BufferedReaderSpec for T +where + Self: Read, + T: ?Sized, +{ + #[inline] + default fn buffer_size(&self) -> usize { + 0 + } + + default fn copy_to(&mut self, _to: &mut (impl Write + ?Sized)) -> Result { + unimplemented!("only called from specializations"); + } +} + +impl BufferedReaderSpec for BufReader +where + Self: Read, + I: ?Sized, +{ + fn buffer_size(&self) -> usize { + self.capacity() + } + + fn copy_to(&mut self, to: &mut (impl Write + ?Sized)) -> Result { + let mut len = 0; + + loop { + // Hack: this relies on `impl Read for BufReader` always calling fill_buf + // if the buffer is empty, even for empty slices. + // It can't be called directly here since specialization prevents us + // from adding I: Read + match self.read(&mut []) { + Ok(_) => {} + Err(e) if e.kind() == ErrorKind::Interrupted => continue, + Err(e) => return Err(e), + } + let buf = self.buffer(); + if self.buffer().len() == 0 { + return Ok(len); + } + + // In case the writer side is a BufWriter then its write_all + // implements an optimization that passes through large + // buffers to the underlying writer. That code path is #[cold] + // but we're still avoiding redundant memcopies when doing + // a copy between buffered inputs and outputs. + to.write_all(buf)?; + len += buf.len() as u64; + self.discard_buffer(); + } + } } /// Specialization of the read-write loop that either uses a stack buffer /// or reuses the internal buffer of a BufWriter -trait BufferedCopySpec: Write { - fn copy_to(reader: &mut R, writer: &mut Self) -> Result; +trait BufferedWriterSpec: Write { + fn buffer_size(&self) -> usize; + + fn copy_from(&mut self, reader: &mut R) -> Result; } -impl BufferedCopySpec for W { - default fn copy_to(reader: &mut R, writer: &mut Self) -> Result { - stack_buffer_copy(reader, writer) +impl BufferedWriterSpec for W { + #[inline] + default fn buffer_size(&self) -> usize { + 0 + } + + default fn copy_from(&mut self, reader: &mut R) -> Result { + stack_buffer_copy(reader, self) } } -impl BufferedCopySpec for BufWriter { - fn copy_to(reader: &mut R, writer: &mut Self) -> Result { - if writer.capacity() < DEFAULT_BUF_SIZE { - return stack_buffer_copy(reader, writer); +impl BufferedWriterSpec for BufWriter { + fn buffer_size(&self) -> usize { + self.capacity() + } + + fn copy_from(&mut self, reader: &mut R) -> Result { + if self.capacity() < DEFAULT_BUF_SIZE { + return stack_buffer_copy(reader, self); } let mut len = 0; let mut init = 0; loop { - let buf = writer.buffer_mut(); + let buf = self.buffer_mut(); let mut read_buf: BorrowedBuf<'_> = buf.spare_capacity_mut().into(); unsafe { @@ -127,7 +211,7 @@ fn copy_to(reader: &mut R, writer: &mut Self) -> Result { Err(e) => return Err(e), } } else { - writer.flush_buf()?; + self.flush_buf()?; init = 0; } } diff --git a/library/std/src/io/copy/tests.rs b/library/std/src/io/copy/tests.rs new file mode 100644 index 00000000000..8c816af15b8 --- /dev/null +++ b/library/std/src/io/copy/tests.rs @@ -0,0 +1,108 @@ +use crate::cmp::{max, min}; +use crate::io::*; + +#[test] +fn copy_copies() { + let mut r = repeat(0).take(4); + let mut w = sink(); + assert_eq!(copy(&mut r, &mut w).unwrap(), 4); + + let mut r = repeat(0).take(1 << 17); + assert_eq!(copy(&mut r as &mut dyn Read, &mut w as &mut dyn Write).unwrap(), 1 << 17); +} + +struct ShortReader { + cap: usize, + read_size: usize, + observed_buffer: usize, +} + +impl Read for ShortReader { + fn read(&mut self, buf: &mut [u8]) -> Result { + let bytes = min(self.cap, self.read_size); + self.cap -= bytes; + self.observed_buffer = max(self.observed_buffer, buf.len()); + Ok(bytes) + } +} + +struct WriteObserver { + observed_buffer: usize, +} + +impl Write for WriteObserver { + fn write(&mut self, buf: &[u8]) -> Result { + self.observed_buffer = max(self.observed_buffer, buf.len()); + Ok(buf.len()) + } + + fn flush(&mut self) -> Result<()> { + Ok(()) + } +} + +#[test] +fn copy_specializes_bufwriter() { + let cap = 117 * 1024; + let buf_sz = 16 * 1024; + let mut r = ShortReader { cap, observed_buffer: 0, read_size: 1337 }; + let mut w = BufWriter::with_capacity(buf_sz, WriteObserver { observed_buffer: 0 }); + assert_eq!( + copy(&mut r, &mut w).unwrap(), + cap as u64, + "expected the whole capacity to be copied" + ); + assert_eq!(r.observed_buffer, buf_sz, "expected a large buffer to be provided to the reader"); + assert!(w.get_mut().observed_buffer > DEFAULT_BUF_SIZE, "expected coalesced writes"); +} + +#[test] +fn copy_specializes_bufreader() { + let mut source = vec![0; 768 * 1024]; + source[1] = 42; + let mut buffered = BufReader::with_capacity(256 * 1024, Cursor::new(&mut source)); + + let mut sink = Vec::new(); + assert_eq!(crate::io::copy(&mut buffered, &mut sink).unwrap(), source.len() as u64); + assert_eq!(source.as_slice(), sink.as_slice()); + + let buf_sz = 71 * 1024; + assert!(buf_sz > DEFAULT_BUF_SIZE, "test precondition"); + + let mut buffered = BufReader::with_capacity(buf_sz, Cursor::new(&mut source)); + let mut sink = WriteObserver { observed_buffer: 0 }; + assert_eq!(crate::io::copy(&mut buffered, &mut sink).unwrap(), source.len() as u64); + assert_eq!( + sink.observed_buffer, buf_sz, + "expected a large buffer to be provided to the writer" + ); +} + +#[cfg(unix)] +mod io_benches { + use crate::fs::File; + use crate::fs::OpenOptions; + use crate::io::prelude::*; + use crate::io::BufReader; + + use test::Bencher; + + #[bench] + fn bench_copy_buf_reader(b: &mut Bencher) { + let mut file_in = File::open("/dev/zero").expect("opening /dev/zero failed"); + // use dyn to avoid specializations unrelated to readbuf + let dyn_in = &mut file_in as &mut dyn Read; + let mut reader = BufReader::with_capacity(256 * 1024, dyn_in.take(0)); + let mut writer = + OpenOptions::new().write(true).open("/dev/null").expect("opening /dev/null failed"); + + const BYTES: u64 = 1024 * 1024; + + b.bytes = BYTES; + + b.iter(|| { + reader.get_mut().set_limit(BYTES); + crate::io::copy(&mut reader, &mut writer).unwrap() + }); + } +} diff --git a/library/std/src/io/util/tests.rs b/library/std/src/io/util/tests.rs index ce5e2c9da1d..1baa94e64c9 100644 --- a/library/std/src/io/util/tests.rs +++ b/library/std/src/io/util/tests.rs @@ -1,67 +1,8 @@ -use crate::cmp::{max, min}; use crate::io::prelude::*; -use crate::io::{ - copy, empty, repeat, sink, BorrowedBuf, BufWriter, Empty, Repeat, Result, SeekFrom, Sink, - DEFAULT_BUF_SIZE, -}; +use crate::io::{empty, repeat, sink, BorrowedBuf, Empty, Repeat, SeekFrom, Sink}; use crate::mem::MaybeUninit; -#[test] -fn copy_copies() { - let mut r = repeat(0).take(4); - let mut w = sink(); - assert_eq!(copy(&mut r, &mut w).unwrap(), 4); - - let mut r = repeat(0).take(1 << 17); - assert_eq!(copy(&mut r as &mut dyn Read, &mut w as &mut dyn Write).unwrap(), 1 << 17); -} - -struct ShortReader { - cap: usize, - read_size: usize, - observed_buffer: usize, -} - -impl Read for ShortReader { - fn read(&mut self, buf: &mut [u8]) -> Result { - let bytes = min(self.cap, self.read_size); - self.cap -= bytes; - self.observed_buffer = max(self.observed_buffer, buf.len()); - Ok(bytes) - } -} - -struct WriteObserver { - observed_buffer: usize, -} - -impl Write for WriteObserver { - fn write(&mut self, buf: &[u8]) -> Result { - self.observed_buffer = max(self.observed_buffer, buf.len()); - Ok(buf.len()) - } - - fn flush(&mut self) -> Result<()> { - Ok(()) - } -} - -#[test] -fn copy_specializes_bufwriter() { - let cap = 117 * 1024; - let buf_sz = 16 * 1024; - let mut r = ShortReader { cap, observed_buffer: 0, read_size: 1337 }; - let mut w = BufWriter::with_capacity(buf_sz, WriteObserver { observed_buffer: 0 }); - assert_eq!( - copy(&mut r, &mut w).unwrap(), - cap as u64, - "expected the whole capacity to be copied" - ); - assert_eq!(r.observed_buffer, buf_sz, "expected a large buffer to be provided to the reader"); - assert!(w.get_mut().observed_buffer > DEFAULT_BUF_SIZE, "expected coalesced writes"); -} - #[test] fn sink_sinks() { let mut s = sink();