From 3aa233d3dc0ad43f8693e4638e33e81ddf03b96b Mon Sep 17 00:00:00 2001 From: Nathan West Date: Fri, 31 Jul 2020 00:14:17 -0400 Subject: [PATCH] Rebase the LineWriter refactor to the new stdlib layout --- library/std/src/io/buffered.rs | 924 ++++++++++++++++++++++++++------- 1 file changed, 742 insertions(+), 182 deletions(-) diff --git a/library/std/src/io/buffered.rs b/library/std/src/io/buffered.rs index b4c91cced43..de33c7fe120 100644 --- a/library/std/src/io/buffered.rs +++ b/library/std/src/io/buffered.rs @@ -518,33 +518,80 @@ pub fn with_capacity(capacity: usize, inner: W) -> BufWriter { BufWriter { inner: Some(inner), buf: Vec::with_capacity(capacity), panicked: false } } + /// Send data in our local buffer into the inner writer, looping as + /// necessary until either it's all been sent or an error occurs. + /// + /// Because all the data in the buffer has been reported to our owner as + /// "successfully written" (by returning nonzero success values from + /// `write`), any 0-length writes from `inner` must be reported as i/o + /// errors from this method. fn flush_buf(&mut self) -> io::Result<()> { - let mut written = 0; - let len = self.buf.len(); - let mut ret = Ok(()); - while written < len { + /// Helper struct to ensure the buffer is updated after all the writes + /// are complete + struct BufGuard<'a> { + buffer: &'a mut Vec, + written: usize, + } + + impl<'a> BufGuard<'a> { + fn new(buffer: &'a mut Vec) -> Self { + Self { buffer, written: 0 } + } + + /// The unwritten part of the buffer + fn remaining(&self) -> &[u8] { + &self.buffer[self.written..] + } + + /// Flag some bytes as removed from the front of the buffer + fn consume(&mut self, amt: usize) { + self.written += amt; + } + + /// true if all of the bytes have been written + fn done(&self) -> bool { + self.written >= self.buffer.len() + } + } + + impl Drop for BufGuard<'_> { + fn drop(&mut self) { + if self.written > 0 { + self.buffer.drain(..self.written); + } + } + } + + let mut guard = BufGuard::new(&mut self.buf); + let inner = self.inner.as_mut().unwrap(); + while !guard.done() { self.panicked = true; - let r = self.inner.as_mut().unwrap().write(&self.buf[written..]); + let r = inner.write(guard.remaining()); self.panicked = false; match r { Ok(0) => { - ret = - Err(Error::new(ErrorKind::WriteZero, "failed to write the buffered data")); - break; + return Err(Error::new( + ErrorKind::WriteZero, + "failed to write the buffered data", + )); } - Ok(n) => written += n, + Ok(n) => guard.consume(n), Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} - Err(e) => { - ret = Err(e); - break; - } + Err(e) => return Err(e), } } - if written > 0 { - self.buf.drain(..written); - } - ret + Ok(()) + } + + /// Buffer some data without flushing it, regardless of the size of the + /// data. Writes as much as possible without exceeding capacity. Returns + /// the number of bytes written. + fn write_to_buf(&mut self, buf: &[u8]) -> usize { + let available = self.buf.capacity() - self.buf.len(); + let amt_to_buffer = available.min(buf.len()); + self.buf.extend_from_slice(&buf[..amt_to_buffer]); + amt_to_buffer } /// Gets a reference to the underlying writer. @@ -657,13 +704,35 @@ fn write(&mut self, buf: &[u8]) -> io::Result { if self.buf.len() + buf.len() > self.buf.capacity() { self.flush_buf()?; } + // FIXME: Why no len > capacity? Why not buffer len == capacity? #72919 if buf.len() >= self.buf.capacity() { self.panicked = true; let r = self.get_mut().write(buf); self.panicked = false; r } else { - self.buf.write(buf) + self.buf.extend_from_slice(buf); + Ok(buf.len()) + } + } + + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + // Normally, `write_all` just calls `write` in a loop. We can do better + // by calling `self.get_mut().write_all()` directly, which avoids + // round trips through the buffer in the event of a series of partial + // writes in some circumstances. + if self.buf.len() + buf.len() > self.buf.capacity() { + self.flush_buf()?; + } + // FIXME: Why no len > capacity? Why not buffer len == capacity? #72919 + if buf.len() >= self.buf.capacity() { + self.panicked = true; + let r = self.get_mut().write_all(buf); + self.panicked = false; + r + } else { + self.buf.extend_from_slice(buf); + Ok(()) } } @@ -672,13 +741,15 @@ fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { if self.buf.len() + total_len > self.buf.capacity() { self.flush_buf()?; } + // FIXME: Why no len > capacity? Why not buffer len == capacity? #72919 if total_len >= self.buf.capacity() { self.panicked = true; let r = self.get_mut().write_vectored(bufs); self.panicked = false; r } else { - self.buf.write_vectored(bufs) + bufs.iter().for_each(|b| self.buf.extend_from_slice(b)); + Ok(total_len) } } @@ -710,7 +781,8 @@ impl Seek for BufWriter { /// /// Seeking always writes out the internal buffer before seeking. fn seek(&mut self, pos: SeekFrom) -> io::Result { - self.flush_buf().and_then(|_| self.get_mut().seek(pos)) + self.flush_buf()?; + self.get_mut().seek(pos) } } @@ -816,6 +888,268 @@ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { } } +/// Private helper struct for implementing the line-buffered writing logic. +/// This shim temporarily wraps a BufWriter, and uses its internals to +/// implement a line-buffered writer (specifically by using the internal +/// methods like write_to_buf and flush_buf). In this way, a more +/// efficient abstraction can be created than one that only had access to +/// `write` and `flush`, without needlessly duplicating a lot of the +/// implementation details of BufWriter. This also allows existing +/// `BufWriters` to be temporarily given line-buffering logic; this is what +/// enables Stdout to be alternately in line-buffered or block-buffered mode. +#[derive(Debug)] +pub(super) struct LineWriterShim<'a, W: Write> { + buffer: &'a mut BufWriter, +} + +impl<'a, W: Write> LineWriterShim<'a, W> { + pub fn new(buffer: &'a mut BufWriter) -> Self { + Self { buffer } + } + + /// Get a mutable reference to the inner writer (that is, the writer + /// wrapped by the BufWriter). Be careful with this writer, as writes to + /// it will bypass the buffer. + fn inner_mut(&mut self) -> &mut W { + self.buffer.get_mut() + } + + /// Get the content currently buffered in self.buffer + fn buffered(&self) -> &[u8] { + self.buffer.buffer() + } + + /// Flush the buffer iff the last byte is a newline (indicating that an + /// earlier write only succeeded partially, and we want to retry flushing + /// the buffered line before continuing with a subsequent write) + fn flush_if_completed_line(&mut self) -> io::Result<()> { + match self.buffered().last().copied() { + Some(b'\n') => self.buffer.flush_buf(), + _ => Ok(()), + } + } +} + +impl<'a, W: Write> Write for LineWriterShim<'a, W> { + /// Write some data into this BufReader with line buffering. This means + /// that, if any newlines are present in the data, the data up to the last + /// newline is sent directly to the underlying writer, and data after it + /// is buffered. Returns the number of bytes written. + /// + /// This function operates on a "best effort basis"; in keeping with the + /// convention of `Write::write`, it makes at most one attempt to write + /// new data to the underlying writer. If that write only reports a partial + /// success, the remaining data will be buffered. + /// + /// Because this function attempts to send completed lines to the underlying + /// writer, it will also flush the existing buffer if it contains any + /// newlines, even if the incoming data does not contain any newlines. + fn write(&mut self, buf: &[u8]) -> io::Result { + let newline_idx = match memchr::memrchr(b'\n', buf) { + // If there are no new newlines (that is, if this write is less than + // one line), just do a regular buffered write + None => { + self.flush_if_completed_line()?; + return self.buffer.write(buf); + } + // Otherwise, arrange for the lines to be written directly to the + // inner writer. + Some(newline_idx) => newline_idx + 1, + }; + + // Flush existing content to prepare for our write + self.buffer.flush_buf()?; + + // This is what we're going to try to write directly to the inner + // writer. The rest will be buffered, if nothing goes wrong. + let lines = &buf[..newline_idx]; + + // Write `lines` directly to the inner writer. In keeping with the + // `write` convention, make at most one attempt to add new (unbuffered) + // data. Because this write doesn't touch the BufWriter state directly, + // and the buffer is known to be empty, we don't need to worry about + // self.buffer.panicked here. + let flushed = self.inner_mut().write(lines)?; + + // If buffer returns Ok(0), propagate that to the caller without + // doing additional buffering; otherwise we're just guaranteeing + // an "ErrorKind::WriteZero" later. + if flushed == 0 { + return Ok(0); + } + + // Now that the write has succeeded, buffer the rest (or as much of + // the rest as possible). If there were any unwritten newlines, we + // only buffer out to the last unwritten newline that fits in the + // buffer; this helps prevent flushing partial lines on subsequent + // calls to LineWriterShim::write. + + // Handle the cases in order of most-common to least-common, under + // the presumption that most writes succeed in totality, and that most + // writes are smaller than the buffer. + // - Is this a partial line (ie, no newlines left in the unwritten tail) + // - If not, does the data out to the last unwritten newline fit in + // the buffer? + // - If not, scan for the last newline that *does* fit in the buffer + let tail = if flushed >= newline_idx { + &buf[flushed..] + } else if newline_idx - flushed <= self.buffer.capacity() { + &buf[flushed..newline_idx] + } else { + let scan_area = &buf[flushed..]; + let scan_area = &scan_area[..self.buffer.capacity()]; + match memchr::memrchr(b'\n', scan_area) { + Some(newline_idx) => &scan_area[..newline_idx + 1], + None => scan_area, + } + }; + + let buffered = self.buffer.write_to_buf(tail); + Ok(flushed + buffered) + } + + fn flush(&mut self) -> io::Result<()> { + self.buffer.flush() + } + + /// Write some vectored data into this BufReader with line buffering. This + /// means that, if any newlines are present in the data, the data up to + /// and including the buffer containing the last newline is sent directly + /// to the inner writer, and the data after it is buffered. Returns the + /// number of bytes written. + /// + /// This function operates on a "best effort basis"; in keeping with the + /// convention of `Write::write`, it makes at most one attempt to write + /// new data to the underlying writer. + /// + /// Because this function attempts to send completed lines to the underlying + /// writer, it will also flush the existing buffer if it contains any + /// newlines. + /// + /// Because sorting through an array of `IoSlice` can be a bit convoluted, + /// This method differs from write in the following ways: + /// + /// - It attempts to write the full content of all the buffers up to and + /// including the one containing the last newline. This means that it + /// may attempt to write a partial line, that buffer has data past the + /// newline. + /// - If the write only reports partial success, it does not attempt to + /// find the precise location of the written bytes and buffer the rest. + /// + /// If the underlying vector doesn't support vectored writing, we instead + /// simply write the first non-empty buffer with `write`. This way, we + /// get the benefits of more granular partial-line handling without losing + /// anything in efficiency + fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { + // If there's no specialized behavior for write_vectored, just use + // write. This has the benefit of more granular partial-line handling. + if !self.is_write_vectored() { + return match bufs.iter().find(|buf| !buf.is_empty()) { + Some(buf) => self.write(buf), + None => Ok(0), + }; + } + + // Find the buffer containing the last newline + let last_newline_buf_idx = bufs + .iter() + .enumerate() + .rev() + .find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i)); + + // If there are no new newlines (that is, if this write is less than + // one line), just do a regular buffered write + let last_newline_buf_idx = match last_newline_buf_idx { + // No newlines; just do a normal buffered write + None => { + self.flush_if_completed_line()?; + return self.buffer.write_vectored(bufs); + } + Some(i) => i, + }; + + // Flush existing content to prepare for our write + self.buffer.flush_buf()?; + + // This is what we're going to try to write directly to the inner + // writer. The rest will be buffered, if nothing goes wrong. + let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1); + + // Write `lines` directly to the inner writer. In keeping with the + // `write` convention, make at most one attempt to add new (unbuffered) + // data. Because this write doesn't touch the BufWriter state directly, + // and the buffer is known to be empty, we don't need to worry about + // self.panicked here. + let flushed = self.inner_mut().write_vectored(lines)?; + + // If inner returns Ok(0), propagate that to the caller without + // doing additional buffering; otherwise we're just guaranteeing + // an "ErrorKind::WriteZero" later. + if flushed == 0 { + return Ok(0); + } + + // Don't try to reconstruct the exact amount written; just bail + // in the event of a partial write + let lines_len = lines.iter().map(|buf| buf.len()).sum(); + if flushed < lines_len { + return Ok(flushed); + } + + // Now that the write has succeeded, buffer the rest (or as much of the + // rest as possible) + let buffered: usize = tail + .iter() + .filter(|buf| !buf.is_empty()) + .map(|buf| self.buffer.write_to_buf(buf)) + .take_while(|&n| n > 0) + .sum(); + + Ok(flushed + buffered) + } + + fn is_write_vectored(&self) -> bool { + self.buffer.is_write_vectored() + } + + /// Write some data into this BufReader with line buffering. This means + /// that, if any newlines are present in the data, the data up to the last + /// newline is sent directly to the underlying writer, and data after it + /// is buffered. + /// + /// Because this function attempts to send completed lines to the underlying + /// writer, it will also flush the existing buffer if it contains any + /// newlines, even if the incoming data does not contain any newlines. + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + let newline_idx = match memchr::memrchr(b'\n', buf) { + // If there are no new newlines (that is, if this write is less than + // one line), just do a regular buffered write + None => { + self.flush_if_completed_line()?; + return self.buffer.write_all(buf); + } + // Otherwise, arrange for the lines to be written directly to the + // inner writer. + Some(newline_idx) => newline_idx, + }; + + // Flush existing content to prepare for our write + self.buffer.flush_buf()?; + + // This is what we're going to try to write directly to the inner + // writer. The rest will be buffered, if nothing goes wrong. + let (lines, tail) = buf.split_at(newline_idx + 1); + + // Write `lines` directly to the inner writer, bypassing the buffer. + self.inner_mut().write_all(lines)?; + + // Now that the write has succeeded, buffer the rest with + // BufWriter::write_all. This will buffer as much as possible, but + // continue flushing as necessary if our tail is huge. + self.buffer.write_all(tail) + } +} + /// Wraps a writer and buffers output to it, flushing whenever a newline /// (`0x0a`, `'\n'`) is detected. /// @@ -883,7 +1217,6 @@ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { #[stable(feature = "rust1", since = "1.0.0")] pub struct LineWriter { inner: BufWriter, - need_flush: bool, } impl LineWriter { @@ -924,7 +1257,7 @@ pub fn new(inner: W) -> LineWriter { /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub fn with_capacity(capacity: usize, inner: W) -> LineWriter { - LineWriter { inner: BufWriter::with_capacity(capacity, inner), need_flush: false } + LineWriter { inner: BufWriter::with_capacity(capacity, inner) } } /// Gets a reference to the underlying writer. @@ -998,110 +1331,40 @@ pub fn get_mut(&mut self) -> &mut W { /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub fn into_inner(self) -> Result>> { - self.inner.into_inner().map_err(|IntoInnerError(buf, e)| { - IntoInnerError(LineWriter { inner: buf, need_flush: false }, e) - }) + self.inner + .into_inner() + .map_err(|IntoInnerError(buf, e)| IntoInnerError(LineWriter { inner: buf }, e)) } } #[stable(feature = "rust1", since = "1.0.0")] impl Write for LineWriter { fn write(&mut self, buf: &[u8]) -> io::Result { - if self.need_flush { - self.flush()?; - } - - // Find the last newline character in the buffer provided. If found then - // we're going to write all the data up to that point and then flush, - // otherwise we just write the whole block to the underlying writer. - let i = match memchr::memrchr(b'\n', buf) { - Some(i) => i, - None => return self.inner.write(buf), - }; - - // Ok, we're going to write a partial amount of the data given first - // followed by flushing the newline. After we've successfully written - // some data then we *must* report that we wrote that data, so future - // errors are ignored. We set our internal `need_flush` flag, though, in - // case flushing fails and we need to try it first next time. - let n = self.inner.write(&buf[..=i])?; - self.need_flush = true; - if self.flush().is_err() || n != i + 1 { - return Ok(n); - } - - // At this point we successfully wrote `i + 1` bytes and flushed it out, - // meaning that the entire line is now flushed out on the screen. While - // we can attempt to finish writing the rest of the data provided. - // Remember though that we ignore errors here as we've successfully - // written data, so we need to report that. - match self.inner.write(&buf[i + 1..]) { - Ok(i) => Ok(n + i), - Err(_) => Ok(n), - } - } - - // Vectored writes are very similar to the writes above, but adjusted for - // the list of buffers that we have to write. - fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { - if self.need_flush { - self.flush()?; - } - - // Find the last newline, and failing that write the whole buffer - let last_newline = bufs.iter().enumerate().rev().find_map(|(i, buf)| { - let pos = memchr::memrchr(b'\n', buf)?; - Some((i, pos)) - }); - let (i, j) = match last_newline { - Some(pair) => pair, - None => return self.inner.write_vectored(bufs), - }; - let (prefix, suffix) = bufs.split_at(i); - let (buf, suffix) = suffix.split_at(1); - let buf = &buf[0]; - - // Write everything up to the last newline, flushing afterwards. Note - // that only if we finished our entire `write_vectored` do we try the - // subsequent - // `write` - let mut n = 0; - let prefix_amt = prefix.iter().map(|i| i.len()).sum(); - if prefix_amt > 0 { - n += self.inner.write_vectored(prefix)?; - self.need_flush = true; - } - if n == prefix_amt { - match self.inner.write(&buf[..=j]) { - Ok(m) => n += m, - Err(e) if n == 0 => return Err(e), - Err(_) => return Ok(n), - } - self.need_flush = true; - } - if self.flush().is_err() || n != j + 1 + prefix_amt { - return Ok(n); - } - - // ... and now write out everything remaining - match self.inner.write(&buf[j + 1..]) { - Ok(i) => n += i, - Err(_) => return Ok(n), - } - - if suffix.iter().map(|s| s.len()).sum::() == 0 { - return Ok(n); - } - match self.inner.write_vectored(suffix) { - Ok(i) => Ok(n + i), - Err(_) => Ok(n), - } + LineWriterShim::new(&mut self.inner).write(buf) } fn flush(&mut self) -> io::Result<()> { - self.inner.flush()?; - self.need_flush = false; - Ok(()) + self.inner.flush() + } + + fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { + LineWriterShim::new(&mut self.inner).write_vectored(bufs) + } + + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } + + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + LineWriterShim::new(&mut self.inner).write_all(buf) + } + + fn write_all_vectored(&mut self, bufs: &mut [IoSlice<'_>]) -> io::Result<()> { + LineWriterShim::new(&mut self.inner).write_all_vectored(bufs) + } + + fn write_fmt(&mut self, fmt: fmt::Arguments<'_>) -> io::Result<()> { + LineWriterShim::new(&mut self.inner).write_fmt(fmt) } } @@ -1124,7 +1387,7 @@ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { #[cfg(test)] mod tests { use crate::io::prelude::*; - use crate::io::{self, BufReader, BufWriter, IoSlice, LineWriter, SeekFrom}; + use crate::io::{self, BufReader, BufWriter, ErrorKind, IoSlice, LineWriter, SeekFrom}; use crate::sync::atomic::{AtomicUsize, Ordering}; use crate::thread; @@ -1133,6 +1396,9 @@ pub struct ShortReader { lengths: Vec, } + // FIXME: rustfmt and tidy disagree about the correct formatting of this + // function. This leads to issues for users with editors configured to + // rustfmt-on-save. impl Read for ShortReader { fn read(&mut self, _: &mut [u8]) -> io::Result { if self.lengths.is_empty() { Ok(0) } else { Ok(self.lengths.remove(0)) } @@ -1408,34 +1674,6 @@ fn test_read_until() { assert_eq!(v, []); } - #[test] - fn test_line_buffer_fail_flush() { - // Issue #32085 - struct FailFlushWriter<'a>(&'a mut Vec); - - impl Write for FailFlushWriter<'_> { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.0.extend_from_slice(buf); - Ok(buf.len()) - } - fn flush(&mut self) -> io::Result<()> { - Err(io::Error::new(io::ErrorKind::Other, "flush failed")) - } - } - - let mut buf = Vec::new(); - { - let mut writer = LineWriter::new(FailFlushWriter(&mut buf)); - let to_write = b"abc\ndef"; - if let Ok(written) = writer.write(to_write) { - assert!(written < to_write.len(), "didn't flush on new line"); - // PASS - return; - } - } - assert!(buf.is_empty(), "write returned an error but wrote data"); - } - #[test] fn test_line_buffer() { let mut writer = LineWriter::new(Vec::new()); @@ -1556,41 +1794,104 @@ fn bench_buffered_writer(b: &mut test::Bencher) { b.iter(|| BufWriter::new(io::sink())); } - struct AcceptOneThenFail { - written: bool, - flushed: bool, + /// A simple `Write` target, designed to be wrapped by `LineWriter` / + /// `BufWriter` / etc, that can have its `write` & `flush` behavior + /// configured + #[derive(Default, Clone)] + struct ProgrammableSink { + // Writes append to this slice + pub buffer: Vec, + + // Flush sets this flag + pub flushed: bool, + + // If true, writes will always be an error + pub always_write_error: bool, + + // If true, flushes will always be an error + pub always_flush_error: bool, + + // If set, only up to this number of bytes will be written in a single + // call to `write` + pub accept_prefix: Option, + + // If set, counts down with each write, and writes return an error + // when it hits 0 + pub max_writes: Option, + + // If set, attempting to write when max_writes == Some(0) will be an + // error; otherwise, it will return Ok(0). + pub error_after_max_writes: bool, } - impl Write for AcceptOneThenFail { + impl Write for ProgrammableSink { fn write(&mut self, data: &[u8]) -> io::Result { - if !self.written { - assert_eq!(data, b"a\nb\n"); - self.written = true; - Ok(data.len()) - } else { - Err(io::Error::new(io::ErrorKind::NotFound, "test")) + if self.always_write_error { + return Err(io::Error::new(io::ErrorKind::Other, "test - always_write_error")); } + + match self.max_writes { + Some(0) if self.error_after_max_writes => { + return Err(io::Error::new(io::ErrorKind::Other, "test - max_writes")); + } + Some(0) => return Ok(0), + Some(ref mut count) => *count -= 1, + None => {} + } + + let len = match self.accept_prefix { + None => data.len(), + Some(prefix) => data.len().min(prefix), + }; + + let data = &data[..len]; + self.buffer.extend_from_slice(data); + + Ok(len) } fn flush(&mut self) -> io::Result<()> { - assert!(self.written); - assert!(!self.flushed); - self.flushed = true; - Err(io::Error::new(io::ErrorKind::Other, "test")) + if self.always_flush_error { + Err(io::Error::new(io::ErrorKind::Other, "test - always_flush_error")) + } else { + self.flushed = true; + Ok(()) + } } } + /// Previously the `LineWriter` could successfully write some bytes but + /// then fail to report that it has done so. Additionally, an erroneous + /// flush after a successful write was permanently ignored. + /// + /// Test that a line writer correctly reports the number of written bytes, + /// and that it attempts to flush buffered lines from previous writes + /// before processing new data + /// + /// Regression test for #37807 #[test] fn erroneous_flush_retried() { - let a = AcceptOneThenFail { written: false, flushed: false }; + let writer = ProgrammableSink { + // Only write up to 4 bytes at a time + accept_prefix: Some(4), - let mut l = LineWriter::new(a); - assert_eq!(l.write(b"a\nb\na").unwrap(), 4); - assert!(l.get_ref().written); - assert!(l.get_ref().flushed); - l.get_mut().flushed = false; + // Accept the first two writes, then error the others + max_writes: Some(2), + error_after_max_writes: true, - assert_eq!(l.write(b"a").unwrap_err().kind(), io::ErrorKind::Other) + ..Default::default() + }; + + // This should write the first 4 bytes. The rest will be buffered, out + // to the last newline. + let mut writer = LineWriter::new(writer); + assert_eq!(writer.write(b"a\nb\nc\nd\ne").unwrap(), 8); + + // This write should attempt to flush "c\nd\n", then buffer "e". No + // errors should happen here because no further writes should be + // attempted against `writer`. + assert_eq!(writer.write(b"e").unwrap(), 1); + assert_eq!(&writer.get_ref().buffer, b"a\nb\nc\nd\n"); } #[test] @@ -1635,17 +1936,21 @@ fn line_vectored() { 0, ); assert_eq!(a.write_vectored(&[IoSlice::new(b"a\nb"),]).unwrap(), 3); - assert_eq!(a.get_ref(), b"\nabaca\n"); + assert_eq!(a.get_ref(), b"\nabaca\nb"); } #[test] fn line_vectored_partial_and_errors() { + use crate::collections::VecDeque; + enum Call { Write { inputs: Vec<&'static [u8]>, output: io::Result }, Flush { output: io::Result<()> }, } + + #[derive(Default)] struct Writer { - calls: Vec, + calls: VecDeque, } impl Write for Writer { @@ -1654,19 +1959,23 @@ fn write(&mut self, buf: &[u8]) -> io::Result { } fn write_vectored(&mut self, buf: &[IoSlice<'_>]) -> io::Result { - match self.calls.pop().unwrap() { + match self.calls.pop_front().expect("unexpected call to write") { Call::Write { inputs, output } => { assert_eq!(inputs, buf.iter().map(|b| &**b).collect::>()); output } - _ => panic!("unexpected call to write"), + Call::Flush { .. } => panic!("unexpected call to write; expected a flush"), } } + fn is_write_vectored(&self) -> bool { + true + } + fn flush(&mut self) -> io::Result<()> { - match self.calls.pop().unwrap() { + match self.calls.pop_front().expect("Unexpected call to flush") { Call::Flush { output } => output, - _ => panic!("unexpected call to flush"), + Call::Write { .. } => panic!("unexpected call to flush; expected a write"), } } } @@ -1680,24 +1989,275 @@ fn drop(&mut self) { } // partial writes keep going - let mut a = LineWriter::new(Writer { calls: Vec::new() }); + let mut a = LineWriter::new(Writer::default()); a.write_vectored(&[IoSlice::new(&[]), IoSlice::new(b"abc")]).unwrap(); - a.get_mut().calls.push(Call::Flush { output: Ok(()) }); - a.get_mut().calls.push(Call::Write { inputs: vec![b"bcx\n"], output: Ok(4) }); - a.get_mut().calls.push(Call::Write { inputs: vec![b"abcx\n"], output: Ok(1) }); + + a.get_mut().calls.push_back(Call::Write { inputs: vec![b"abc"], output: Ok(1) }); + a.get_mut().calls.push_back(Call::Write { inputs: vec![b"bc"], output: Ok(2) }); + a.get_mut().calls.push_back(Call::Write { inputs: vec![b"x", b"\n"], output: Ok(2) }); + a.write_vectored(&[IoSlice::new(b"x"), IoSlice::new(b"\n")]).unwrap(); - a.get_mut().calls.push(Call::Flush { output: Ok(()) }); + + a.get_mut().calls.push_back(Call::Flush { output: Ok(()) }); a.flush().unwrap(); // erroneous writes stop and don't write more - a.get_mut().calls.push(Call::Write { inputs: vec![b"x\n"], output: Err(err()) }); - assert_eq!(a.write_vectored(&[IoSlice::new(b"x"), IoSlice::new(b"\na")]).unwrap(), 2); - a.get_mut().calls.push(Call::Flush { output: Ok(()) }); - a.get_mut().calls.push(Call::Write { inputs: vec![b"x\n"], output: Ok(2) }); + a.get_mut().calls.push_back(Call::Write { inputs: vec![b"x", b"\na"], output: Err(err()) }); + a.get_mut().calls.push_back(Call::Flush { output: Ok(()) }); + assert!(a.write_vectored(&[IoSlice::new(b"x"), IoSlice::new(b"\na")]).is_err()); a.flush().unwrap(); fn err() -> io::Error { io::Error::new(io::ErrorKind::Other, "x") } } + + /// Test that, in cases where vectored writing is not enabled, the + /// LineWriter uses the normal `write` call, which more-corectly handles + /// partial lines + #[test] + fn line_vectored_ignored() { + let writer = ProgrammableSink::default(); + let mut writer = LineWriter::new(writer); + + let content = [ + IoSlice::new(&[]), + IoSlice::new(b"Line 1\nLine"), + IoSlice::new(b" 2\nLine 3\nL"), + IoSlice::new(&[]), + IoSlice::new(&[]), + IoSlice::new(b"ine 4"), + IoSlice::new(b"\nLine 5\n"), + ]; + + let count = writer.write_vectored(&content).unwrap(); + assert_eq!(count, 11); + assert_eq!(&writer.get_ref().buffer, b"Line 1\n"); + + let count = writer.write_vectored(&content[2..]).unwrap(); + assert_eq!(count, 11); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3\n"); + + let count = writer.write_vectored(&content[5..]).unwrap(); + assert_eq!(count, 5); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3\n"); + + let count = writer.write_vectored(&content[6..]).unwrap(); + assert_eq!(count, 8); + assert_eq!( + writer.get_ref().buffer.as_slice(), + b"Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n".as_ref() + ); + } + + /// Test that, given this input: + /// + /// Line 1\n + /// Line 2\n + /// Line 3\n + /// Line 4 + /// + /// And given a result that only writes to midway through Line 2 + /// + /// That only up to the end of Line 3 is buffered + /// + /// This behavior is desirable because it prevents flushing partial lines + #[test] + fn partial_write_buffers_line() { + let writer = ProgrammableSink { accept_prefix: Some(13), ..Default::default() }; + let mut writer = LineWriter::new(writer); + + assert_eq!(writer.write(b"Line 1\nLine 2\nLine 3\nLine4").unwrap(), 21); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2"); + + assert_eq!(writer.write(b"Line 4").unwrap(), 6); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3\n"); + } + + /// Test that, given this input: + /// + /// Line 1\n + /// Line 2\n + /// Line 3 + /// + /// And given that the full write of lines 1 and 2 was successful + /// That data up to Line 3 is buffered + #[test] + fn partial_line_buffered_after_line_write() { + let writer = ProgrammableSink::default(); + let mut writer = LineWriter::new(writer); + + assert_eq!(writer.write(b"Line 1\nLine 2\nLine 3").unwrap(), 20); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\n"); + + assert!(writer.flush().is_ok()); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3"); + } + + /// Test that, given a partial line that exceeds the length of + /// LineBuffer's buffer (that is, without a trailing newline), that that + /// line is written to the inner writer + #[test] + fn long_line_flushed() { + let writer = ProgrammableSink::default(); + let mut writer = LineWriter::with_capacity(5, writer); + + assert_eq!(writer.write(b"0123456789").unwrap(), 10); + assert_eq!(&writer.get_ref().buffer, b"0123456789"); + } + + /// Test that, given a very long partial line *after* successfully + /// flushing a complete line, that that line is buffered unconditionally, + /// and no additional writes take place. This assures the property that + /// `write` should make at-most-one attempt to write new data. + #[test] + fn line_long_tail_not_flushed() { + let writer = ProgrammableSink::default(); + let mut writer = LineWriter::with_capacity(5, writer); + + // Assert that Line 1\n is flushed, and 01234 is buffered + assert_eq!(writer.write(b"Line 1\n0123456789").unwrap(), 12); + assert_eq!(&writer.get_ref().buffer, b"Line 1\n"); + + // Because the buffer is full, this subsequent write will flush it + assert_eq!(writer.write(b"5").unwrap(), 1); + assert_eq!(&writer.get_ref().buffer, b"Line 1\n01234"); + } + + /// Test that, if an attempt to pre-flush buffered data returns Ok(0), + /// this is propagated as an error. + #[test] + fn line_buffer_write0_error() { + let writer = ProgrammableSink { + // Accept one write, then return Ok(0) on subsequent ones + max_writes: Some(1), + + ..Default::default() + }; + let mut writer = LineWriter::new(writer); + + // This should write "Line 1\n" and buffer "Partial" + assert_eq!(writer.write(b"Line 1\nPartial").unwrap(), 14); + assert_eq!(&writer.get_ref().buffer, b"Line 1\n"); + + // This will attempt to flush "partial", which will return Ok(0), which + // needs to be an error, because we've already informed the client + // that we accepted the write. + let err = writer.write(b" Line End\n").unwrap_err(); + assert_eq!(err.kind(), ErrorKind::WriteZero); + assert_eq!(&writer.get_ref().buffer, b"Line 1\n"); + } + + /// Test that, if a write returns Ok(0) after a successful pre-flush, this + /// is propogated as Ok(0) + #[test] + fn line_buffer_write0_normal() { + let writer = ProgrammableSink { + // Accept two writes, then return Ok(0) on subsequent ones + max_writes: Some(2), + + ..Default::default() + }; + let mut writer = LineWriter::new(writer); + + // This should write "Line 1\n" and buffer "Partial" + assert_eq!(writer.write(b"Line 1\nPartial").unwrap(), 14); + assert_eq!(&writer.get_ref().buffer, b"Line 1\n"); + + // This will flush partial, which will succeed, but then return Ok(0) + // when flushing " Line End\n" + assert_eq!(writer.write(b" Line End\n").unwrap(), 0); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nPartial"); + } + + /// LineWriter has a custom `write_all`; make sure it works correctly + #[test] + fn line_write_all() { + let writer = ProgrammableSink { + // Only write 5 bytes at a time + accept_prefix: Some(5), + ..Default::default() + }; + let mut writer = LineWriter::new(writer); + + writer.write_all(b"Line 1\nLine 2\nLine 3\nLine 4\nPartial").unwrap(); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3\nLine 4\n"); + writer.write_all(b" Line 5\n").unwrap(); + assert_eq!( + writer.get_ref().buffer.as_slice(), + b"Line 1\nLine 2\nLine 3\nLine 4\nPartial Line 5\n".as_ref(), + ); + } + + #[test] + fn line_write_all_error() { + let writer = ProgrammableSink { + // Only accept up to 3 writes of up to 5 bytes each + accept_prefix: Some(5), + max_writes: Some(3), + ..Default::default() + }; + + let mut writer = LineWriter::new(writer); + let res = writer.write_all(b"Line 1\nLine 2\nLine 3\nLine 4\nPartial"); + assert!(res.is_err()); + // An error from write_all leaves everything in an indeterminate state, + // so there's nothing else to test here + } + + /// Under certain circumstances, the old implementation of LineWriter + /// would try to buffer "to the last newline" but be forced to buffer + /// less than that, leading to inappropriate partial line writes. + /// Regression test for that issue. + #[test] + fn partial_multiline_buffering() { + let writer = ProgrammableSink { + // Write only up to 5 bytes at a time + accept_prefix: Some(5), + ..Default::default() + }; + + let mut writer = LineWriter::with_capacity(10, writer); + + let content = b"AAAAABBBBB\nCCCCDDDDDD\nEEE"; + + // When content is written, LineWriter will try to write blocks A, B, + // C, and D. Only block A will succeed. Under the old behavior, LineWriter + // would then try to buffer B, C and D, but because its capacity is 10, + // it will only be able to buffer B and C. We don't want to buffer + // partial lines concurrent with whole lines, so the correct behavior + // is to buffer only block B (out to the newline) + assert_eq!(writer.write(content).unwrap(), 11); + assert_eq!(writer.get_ref().buffer, *b"AAAAA"); + + writer.flush().unwrap(); + assert_eq!(writer.get_ref().buffer, *b"AAAAABBBBB\n"); + } + + /// Same as test_partial_multiline_buffering, but in the event NO full lines + /// fit in the buffer, just buffer as much as possible + #[test] + fn partial_multiline_buffering_without_full_line() { + let writer = ProgrammableSink { + // Write only up to 5 bytes at a time + accept_prefix: Some(5), + ..Default::default() + }; + + let mut writer = LineWriter::with_capacity(5, writer); + + let content = b"AAAAABBBBBBBBBB\nCCCCC\nDDDDD"; + + // When content is written, LineWriter will try to write blocks A, B, + // and C. Only block A will succeed. Under the old behavior, LineWriter + // would then try to buffer B and C, but because its capacity is 5, + // it will only be able to buffer part of B. Because it's not possible + // for it to buffer any complete lines, it should buffer as much of B as + // possible + assert_eq!(writer.write(content).unwrap(), 10); + assert_eq!(writer.get_ref().buffer, *b"AAAAA"); + + writer.flush().unwrap(); + assert_eq!(writer.get_ref().buffer, *b"AAAAABBBBB"); + } }