From 410550665601a8abe8935f7b55d5732fe4c4224f Mon Sep 17 00:00:00 2001 From: The8472 Date: Sun, 1 Nov 2020 19:59:24 +0100 Subject: [PATCH] specialize io::copy to use the memory of the writer if it is a BufWriter --- library/std/src/io/buffered/bufwriter.rs | 14 ++++- library/std/src/io/copy.rs | 80 ++++++++++++++++++++++-- library/std/src/io/util/tests.rs | 50 ++++++++++++++- library/std/src/lib.rs | 2 + 4 files changed, 138 insertions(+), 8 deletions(-) diff --git a/library/std/src/io/buffered/bufwriter.rs b/library/std/src/io/buffered/bufwriter.rs index fa7bd0d72d1..65bc2fcf00a 100644 --- a/library/std/src/io/buffered/bufwriter.rs +++ b/library/std/src/io/buffered/bufwriter.rs @@ -117,7 +117,7 @@ pub fn with_capacity(capacity: usize, inner: W) -> BufWriter { /// "successfully written" (by returning nonzero success values from /// `write`), any 0-length writes from `inner` must be reported as i/o /// errors from this method. - pub(super) fn flush_buf(&mut self) -> io::Result<()> { + pub(in crate::io) fn flush_buf(&mut self) -> io::Result<()> { /// Helper struct to ensure the buffer is updated after all the writes /// are complete. It tracks the number of written bytes and drains them /// all from the front of the buffer when dropped. @@ -243,6 +243,18 @@ pub fn buffer(&self) -> &[u8] { &self.buf } + /// Returns a mutable reference to the internal buffer. + /// + /// This can be used to write data directly into the buffer without triggering writers + /// to the underlying writer. + /// + /// That the buffer is a `Vec` is an implementation detail. + /// Callers should not modify the capacity as there currently is no public API to do so + /// and thus any capacity changes would be unexpected by the user. + pub(in crate::io) fn buffer_mut(&mut self) -> &mut Vec { + &mut self.buf + } + /// Returns the number of bytes the internal buffer can hold without flushing. /// /// # Examples diff --git a/library/std/src/io/copy.rs b/library/std/src/io/copy.rs index b88bca2f2b4..3780f2044cb 100644 --- a/library/std/src/io/copy.rs +++ b/library/std/src/io/copy.rs @@ -1,4 +1,4 @@ -use crate::io::{self, ErrorKind, Read, Write}; +use super::{BufWriter, ErrorKind, Read, Result, Write, DEFAULT_BUF_SIZE}; use crate::mem::MaybeUninit; /// Copies the entire contents of a reader into a writer. @@ -40,7 +40,7 @@ /// } /// ``` #[stable(feature = "rust1", since = "1.0.0")] -pub fn copy(reader: &mut R, writer: &mut W) -> io::Result +pub fn copy(reader: &mut R, writer: &mut W) -> Result where R: Read, W: Write, @@ -54,14 +54,82 @@ pub fn copy(reader: &mut R, writer: &mut W) -> io::Result< } } -/// The general read-write-loop implementation of -/// `io::copy` that is used when specializations are not available or not applicable. -pub(crate) fn generic_copy(reader: &mut R, writer: &mut W) -> io::Result +/// The userspace read-write-loop implementation of `io::copy` that is used when +/// OS-specific specializations for copy offloading are not available or not applicable. +pub(crate) fn generic_copy(reader: &mut R, writer: &mut W) -> Result where R: Read, W: Write, { - let mut buf = MaybeUninit::<[u8; super::DEFAULT_BUF_SIZE]>::uninit(); + BufferedCopySpec::copy_to(reader, writer) +} + +/// Specialization of the read-write loop that either uses a stack buffer +/// or reuses the internal buffer of a BufWriter +trait BufferedCopySpec: Write { + fn copy_to(reader: &mut R, writer: &mut Self) -> Result; +} + +impl BufferedCopySpec for W { + default fn copy_to(reader: &mut R, writer: &mut Self) -> Result { + stack_buffer_copy(reader, writer) + } +} + +impl BufferedCopySpec for BufWriter { + fn copy_to(reader: &mut R, writer: &mut Self) -> Result { + if writer.capacity() < DEFAULT_BUF_SIZE { + return stack_buffer_copy(reader, writer); + } + + // FIXME: #42788 + // + // - This creates a (mut) reference to a slice of + // _uninitialized_ integers, which is **undefined behavior** + // + // - Only the standard library gets to soundly "ignore" this, + // based on its privileged knowledge of unstable rustc + // internals; + unsafe { + let spare_cap = writer.buffer_mut().spare_capacity_mut(); + reader.initializer().initialize(MaybeUninit::slice_assume_init_mut(spare_cap)); + } + + let mut len = 0; + + loop { + let buf = writer.buffer_mut(); + let spare_cap = buf.spare_capacity_mut(); + + if spare_cap.len() >= DEFAULT_BUF_SIZE { + match reader.read(unsafe { MaybeUninit::slice_assume_init_mut(spare_cap) }) { + Ok(0) => return Ok(len), // EOF reached + Ok(bytes_read) => { + assert!(bytes_read <= spare_cap.len()); + // Safety: The initializer contract guarantees that either it or `read` + // will have initialized these bytes. And we just checked that the number + // of bytes is within the buffer capacity. + unsafe { buf.set_len(buf.len() + bytes_read) }; + len += bytes_read as u64; + // Read again if the buffer still has enough capacity, as BufWriter itself would do + // This will occur if the reader returns short reads + continue; + } + Err(ref e) if e.kind() == ErrorKind::Interrupted => continue, + Err(e) => return Err(e), + } + } + + writer.flush_buf()?; + } + } +} + +fn stack_buffer_copy( + reader: &mut R, + writer: &mut W, +) -> Result { + let mut buf = MaybeUninit::<[u8; DEFAULT_BUF_SIZE]>::uninit(); // FIXME: #42788 // // - This creates a (mut) reference to a slice of diff --git a/library/std/src/io/util/tests.rs b/library/std/src/io/util/tests.rs index df34e27d136..7632eaf872a 100644 --- a/library/std/src/io/util/tests.rs +++ b/library/std/src/io/util/tests.rs @@ -1,5 +1,8 @@ +use crate::cmp::{max, min}; use crate::io::prelude::*; -use crate::io::{copy, empty, repeat, sink, Empty, Repeat, SeekFrom, Sink}; +use crate::io::{ + copy, empty, repeat, sink, BufWriter, Empty, Repeat, Result, SeekFrom, Sink, DEFAULT_BUF_SIZE, +}; #[test] fn copy_copies() { @@ -11,6 +14,51 @@ fn copy_copies() { assert_eq!(copy(&mut r as &mut dyn Read, &mut w as &mut dyn Write).unwrap(), 1 << 17); } +struct ShortReader { + cap: usize, + read_size: usize, + observed_buffer: usize, +} + +impl Read for ShortReader { + fn read(&mut self, buf: &mut [u8]) -> Result { + let bytes = min(self.cap, self.read_size); + self.cap -= bytes; + self.observed_buffer = max(self.observed_buffer, buf.len()); + Ok(bytes) + } +} + +struct WriteObserver { + observed_buffer: usize, +} + +impl Write for WriteObserver { + fn write(&mut self, buf: &[u8]) -> Result { + self.observed_buffer = max(self.observed_buffer, buf.len()); + Ok(buf.len()) + } + + fn flush(&mut self) -> Result<()> { + Ok(()) + } +} + +#[test] +fn copy_specializes_bufwriter() { + let cap = 117 * 1024; + let buf_sz = 16 * 1024; + let mut r = ShortReader { cap, observed_buffer: 0, read_size: 1337 }; + let mut w = BufWriter::with_capacity(buf_sz, WriteObserver { observed_buffer: 0 }); + assert_eq!( + copy(&mut r, &mut w).unwrap(), + cap as u64, + "expected the whole capacity to be copied" + ); + assert_eq!(r.observed_buffer, buf_sz, "expected a large buffer to be provided to the reader"); + assert!(w.get_mut().observed_buffer > DEFAULT_BUF_SIZE, "expected coalesced writes"); +} + #[test] fn sink_sinks() { let mut s = sink(); diff --git a/library/std/src/lib.rs b/library/std/src/lib.rs index c1b79ff716c..f16fb3d04db 100644 --- a/library/std/src/lib.rs +++ b/library/std/src/lib.rs @@ -283,6 +283,7 @@ #![feature(maybe_uninit_extra)] #![feature(maybe_uninit_ref)] #![feature(maybe_uninit_slice)] +#![feature(maybe_uninit_uninit_array)] #![feature(min_specialization)] #![feature(needs_panic_runtime)] #![feature(negative_impls)] @@ -326,6 +327,7 @@ #![feature(unsafe_cell_raw_get)] #![feature(unwind_attributes)] #![feature(vec_into_raw_parts)] +#![feature(vec_spare_capacity)] #![feature(wake_trait)] // NB: the above list is sorted to minimize merge conflicts. #![default_lib_allocator]