Auto merge of #112330 - the8472:use-buf-reader-buffer, r=thomcc

Extend io::copy buffer reuse to BufReader too

previously it was only able to use BufWriter. This was due to a limitation in the BufReader generics that prevented specialization. This change works around the issue by using `BufReader where Self: Read` instead of `BufReader<I> where I: Read`. This limits our options, e.g. we can't access the inner reader, but it happens to work out if we rely on some implementation details.

Copying 1MiB from `/dev/zero` to `/dev/null` through a 256KiB BufReader yields following improvements

```
OLD:
    io::copy::tests::bench_copy_buf_reader  51.44µs/iter +/- 703.00ns
NEW:
    io::copy::tests::bench_copy_buf_reader  18.55µs/iter +/- 237.00ns
```

Previously this would read 256KiB into the reader but then copy 8KiB chunks to the writer through an additional intermediate buffer inside `io::copy`. Since those devices don't do much work most of the speedup should come from fewer syscalls and avoided memcopies.

The b3sum crate [notes that the default buffer size in io::copy is too small](4108923f52/b3sum/src/main.rs (L235-L239)). With this optimization they could achieve the desired performance by wrapping the reader in a `BufReader` instead of handrolling it.

Currently the optimization doesn't apply to things like `StdinLock`, but this can be addressed with an additional `AsMutBufReader` specialization trait.
This commit is contained in:
bors 2023-06-17 10:30:59 +00:00
commit 7513407ac8
4 changed files with 207 additions and 74 deletions

View File

@ -222,7 +222,7 @@ pub fn into_inner(self) -> R
/// Invalidates all data in the internal buffer.
#[inline]
fn discard_buffer(&mut self) {
pub(in crate::io) fn discard_buffer(&mut self) {
self.buf.discard_buffer()
}
}

View File

@ -1,6 +1,9 @@
use super::{BorrowedBuf, BufWriter, ErrorKind, Read, Result, Write, DEFAULT_BUF_SIZE};
use super::{BorrowedBuf, BufReader, BufWriter, ErrorKind, Read, Result, Write, DEFAULT_BUF_SIZE};
use crate::mem::MaybeUninit;
#[cfg(test)]
mod tests;
/// Copies the entire contents of a reader into a writer.
///
/// This function will continuously read data from `reader` and then
@ -71,32 +74,113 @@ pub(crate) fn generic_copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W)
R: Read,
W: Write,
{
BufferedCopySpec::copy_to(reader, writer)
let read_buf = BufferedReaderSpec::buffer_size(reader);
let write_buf = BufferedWriterSpec::buffer_size(writer);
if read_buf >= DEFAULT_BUF_SIZE && read_buf >= write_buf {
return BufferedReaderSpec::copy_to(reader, writer);
}
BufferedWriterSpec::copy_from(writer, reader)
}
/// Specialization of the read-write loop that reuses the internal
/// buffer of a BufReader. If there's no buffer then the writer side
/// should be used intead.
trait BufferedReaderSpec {
fn buffer_size(&self) -> usize;
fn copy_to(&mut self, to: &mut (impl Write + ?Sized)) -> Result<u64>;
}
impl<T> BufferedReaderSpec for T
where
Self: Read,
T: ?Sized,
{
#[inline]
default fn buffer_size(&self) -> usize {
0
}
default fn copy_to(&mut self, _to: &mut (impl Write + ?Sized)) -> Result<u64> {
unimplemented!("only called from specializations");
}
}
impl<I> BufferedReaderSpec for BufReader<I>
where
Self: Read,
I: ?Sized,
{
fn buffer_size(&self) -> usize {
self.capacity()
}
fn copy_to(&mut self, to: &mut (impl Write + ?Sized)) -> Result<u64> {
let mut len = 0;
loop {
// Hack: this relies on `impl Read for BufReader` always calling fill_buf
// if the buffer is empty, even for empty slices.
// It can't be called directly here since specialization prevents us
// from adding I: Read
match self.read(&mut []) {
Ok(_) => {}
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
}
let buf = self.buffer();
if self.buffer().len() == 0 {
return Ok(len);
}
// In case the writer side is a BufWriter then its write_all
// implements an optimization that passes through large
// buffers to the underlying writer. That code path is #[cold]
// but we're still avoiding redundant memcopies when doing
// a copy between buffered inputs and outputs.
to.write_all(buf)?;
len += buf.len() as u64;
self.discard_buffer();
}
}
}
/// Specialization of the read-write loop that either uses a stack buffer
/// or reuses the internal buffer of a BufWriter
trait BufferedCopySpec: Write {
fn copy_to<R: Read + ?Sized>(reader: &mut R, writer: &mut Self) -> Result<u64>;
trait BufferedWriterSpec: Write {
fn buffer_size(&self) -> usize;
fn copy_from<R: Read + ?Sized>(&mut self, reader: &mut R) -> Result<u64>;
}
impl<W: Write + ?Sized> BufferedCopySpec for W {
default fn copy_to<R: Read + ?Sized>(reader: &mut R, writer: &mut Self) -> Result<u64> {
stack_buffer_copy(reader, writer)
impl<W: Write + ?Sized> BufferedWriterSpec for W {
#[inline]
default fn buffer_size(&self) -> usize {
0
}
default fn copy_from<R: Read + ?Sized>(&mut self, reader: &mut R) -> Result<u64> {
stack_buffer_copy(reader, self)
}
}
impl<I: ?Sized + Write> BufferedCopySpec for BufWriter<I> {
fn copy_to<R: Read + ?Sized>(reader: &mut R, writer: &mut Self) -> Result<u64> {
if writer.capacity() < DEFAULT_BUF_SIZE {
return stack_buffer_copy(reader, writer);
impl<I: Write + ?Sized> BufferedWriterSpec for BufWriter<I> {
fn buffer_size(&self) -> usize {
self.capacity()
}
fn copy_from<R: Read + ?Sized>(&mut self, reader: &mut R) -> Result<u64> {
if self.capacity() < DEFAULT_BUF_SIZE {
return stack_buffer_copy(reader, self);
}
let mut len = 0;
let mut init = 0;
loop {
let buf = writer.buffer_mut();
let buf = self.buffer_mut();
let mut read_buf: BorrowedBuf<'_> = buf.spare_capacity_mut().into();
unsafe {
@ -127,7 +211,7 @@ fn copy_to<R: Read + ?Sized>(reader: &mut R, writer: &mut Self) -> Result<u64> {
Err(e) => return Err(e),
}
} else {
writer.flush_buf()?;
self.flush_buf()?;
init = 0;
}
}

View File

@ -0,0 +1,108 @@
use crate::cmp::{max, min};
use crate::io::*;
#[test]
fn copy_copies() {
let mut r = repeat(0).take(4);
let mut w = sink();
assert_eq!(copy(&mut r, &mut w).unwrap(), 4);
let mut r = repeat(0).take(1 << 17);
assert_eq!(copy(&mut r as &mut dyn Read, &mut w as &mut dyn Write).unwrap(), 1 << 17);
}
struct ShortReader {
cap: usize,
read_size: usize,
observed_buffer: usize,
}
impl Read for ShortReader {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let bytes = min(self.cap, self.read_size);
self.cap -= bytes;
self.observed_buffer = max(self.observed_buffer, buf.len());
Ok(bytes)
}
}
struct WriteObserver {
observed_buffer: usize,
}
impl Write for WriteObserver {
fn write(&mut self, buf: &[u8]) -> Result<usize> {
self.observed_buffer = max(self.observed_buffer, buf.len());
Ok(buf.len())
}
fn flush(&mut self) -> Result<()> {
Ok(())
}
}
#[test]
fn copy_specializes_bufwriter() {
let cap = 117 * 1024;
let buf_sz = 16 * 1024;
let mut r = ShortReader { cap, observed_buffer: 0, read_size: 1337 };
let mut w = BufWriter::with_capacity(buf_sz, WriteObserver { observed_buffer: 0 });
assert_eq!(
copy(&mut r, &mut w).unwrap(),
cap as u64,
"expected the whole capacity to be copied"
);
assert_eq!(r.observed_buffer, buf_sz, "expected a large buffer to be provided to the reader");
assert!(w.get_mut().observed_buffer > DEFAULT_BUF_SIZE, "expected coalesced writes");
}
#[test]
fn copy_specializes_bufreader() {
let mut source = vec![0; 768 * 1024];
source[1] = 42;
let mut buffered = BufReader::with_capacity(256 * 1024, Cursor::new(&mut source));
let mut sink = Vec::new();
assert_eq!(crate::io::copy(&mut buffered, &mut sink).unwrap(), source.len() as u64);
assert_eq!(source.as_slice(), sink.as_slice());
let buf_sz = 71 * 1024;
assert!(buf_sz > DEFAULT_BUF_SIZE, "test precondition");
let mut buffered = BufReader::with_capacity(buf_sz, Cursor::new(&mut source));
let mut sink = WriteObserver { observed_buffer: 0 };
assert_eq!(crate::io::copy(&mut buffered, &mut sink).unwrap(), source.len() as u64);
assert_eq!(
sink.observed_buffer, buf_sz,
"expected a large buffer to be provided to the writer"
);
}
#[cfg(unix)]
mod io_benches {
use crate::fs::File;
use crate::fs::OpenOptions;
use crate::io::prelude::*;
use crate::io::BufReader;
use test::Bencher;
#[bench]
fn bench_copy_buf_reader(b: &mut Bencher) {
let mut file_in = File::open("/dev/zero").expect("opening /dev/zero failed");
// use dyn to avoid specializations unrelated to readbuf
let dyn_in = &mut file_in as &mut dyn Read;
let mut reader = BufReader::with_capacity(256 * 1024, dyn_in.take(0));
let mut writer =
OpenOptions::new().write(true).open("/dev/null").expect("opening /dev/null failed");
const BYTES: u64 = 1024 * 1024;
b.bytes = BYTES;
b.iter(|| {
reader.get_mut().set_limit(BYTES);
crate::io::copy(&mut reader, &mut writer).unwrap()
});
}
}

View File

@ -1,67 +1,8 @@
use crate::cmp::{max, min};
use crate::io::prelude::*;
use crate::io::{
copy, empty, repeat, sink, BorrowedBuf, BufWriter, Empty, Repeat, Result, SeekFrom, Sink,
DEFAULT_BUF_SIZE,
};
use crate::io::{empty, repeat, sink, BorrowedBuf, Empty, Repeat, SeekFrom, Sink};
use crate::mem::MaybeUninit;
#[test]
fn copy_copies() {
let mut r = repeat(0).take(4);
let mut w = sink();
assert_eq!(copy(&mut r, &mut w).unwrap(), 4);
let mut r = repeat(0).take(1 << 17);
assert_eq!(copy(&mut r as &mut dyn Read, &mut w as &mut dyn Write).unwrap(), 1 << 17);
}
struct ShortReader {
cap: usize,
read_size: usize,
observed_buffer: usize,
}
impl Read for ShortReader {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let bytes = min(self.cap, self.read_size);
self.cap -= bytes;
self.observed_buffer = max(self.observed_buffer, buf.len());
Ok(bytes)
}
}
struct WriteObserver {
observed_buffer: usize,
}
impl Write for WriteObserver {
fn write(&mut self, buf: &[u8]) -> Result<usize> {
self.observed_buffer = max(self.observed_buffer, buf.len());
Ok(buf.len())
}
fn flush(&mut self) -> Result<()> {
Ok(())
}
}
#[test]
fn copy_specializes_bufwriter() {
let cap = 117 * 1024;
let buf_sz = 16 * 1024;
let mut r = ShortReader { cap, observed_buffer: 0, read_size: 1337 };
let mut w = BufWriter::with_capacity(buf_sz, WriteObserver { observed_buffer: 0 });
assert_eq!(
copy(&mut r, &mut w).unwrap(),
cap as u64,
"expected the whole capacity to be copied"
);
assert_eq!(r.observed_buffer, buf_sz, "expected a large buffer to be provided to the reader");
assert!(w.get_mut().observed_buffer > DEFAULT_BUF_SIZE, "expected coalesced writes");
}
#[test]
fn sink_sinks() {
let mut s = sink();