From 6438ef9aa3eec59f413e89278519df38646baeb0 Mon Sep 17 00:00:00 2001 From: Lukas Wirth Date: Mon, 13 Jun 2022 13:34:07 +0200 Subject: [PATCH] internal: Bring back JodChild into flychecking for cancellation --- crates/flycheck/src/lib.rs | 77 +++++++++++++------ crates/project-model/src/build_scripts.rs | 2 +- crates/stdx/src/lib.rs | 12 +++ crates/stdx/src/process.rs | 90 +++++++++++++---------- 4 files changed, 117 insertions(+), 64 deletions(-) diff --git a/crates/flycheck/src/lib.rs b/crates/flycheck/src/lib.rs index 2cd19952678..4ff49cc8053 100644 --- a/crates/flycheck/src/lib.rs +++ b/crates/flycheck/src/lib.rs @@ -2,12 +2,16 @@ //! another compatible command (f.x. clippy) in a background thread and provide //! LSP diagnostics based on the output of the command. -use std::{fmt, io, process::Command, time::Duration}; +use std::{ + fmt, io, + process::{ChildStderr, ChildStdout, Command, Stdio}, + time::Duration, +}; use crossbeam_channel::{never, select, unbounded, Receiver, Sender}; use paths::AbsPathBuf; use serde::Deserialize; -use stdx::process::streaming_output; +use stdx::{process::streaming_output, JodChild}; pub use cargo_metadata::diagnostic::{ Applicability, Diagnostic, DiagnosticCode, DiagnosticLevel, DiagnosticSpan, @@ -117,7 +121,7 @@ struct FlycheckActor { sender: Box, config: FlycheckConfig, workspace_root: AbsPathBuf, - /// WatchThread exists to wrap around the communication needed to be able to + /// CargoHandle exists to wrap around the communication needed to be able to /// run `cargo check` without blocking. Currently the Rust standard library /// doesn't provide a way to read sub-process output without blocking, so we /// have to wrap sub-processes output handling in a thread and pass messages @@ -153,14 +157,24 @@ fn run(mut self, inbox: Receiver) { while let Some(event) = self.next_event(&inbox) { match event { Event::Restart(Restart) => { + // Drop and cancel the previously spawned process + self.cargo_handle.take(); while let Ok(Restart) = inbox.recv_timeout(Duration::from_millis(50)) {} self.cancel_check_process(); let command = self.check_command(); - tracing::info!("restart flycheck {:?}", command); - self.cargo_handle = Some(CargoHandle::spawn(command)); - self.progress(Progress::DidStart); + let command_f = format!("restart flycheck {command:?}"); + match CargoHandle::spawn(command) { + Ok(cargo_handle) => { + tracing::info!("{}", command_f); + self.cargo_handle = Some(cargo_handle); + self.progress(Progress::DidStart); + } + Err(e) => { + tracing::error!("{command_f} failed: {e:?}",); + } + } } Event::CheckEvent(None) => { // Watcher finished, replace it with a never channel to @@ -249,37 +263,58 @@ fn send(&self, check_task: Message) { } } +/// A handle to a cargo process used for fly-checking. struct CargoHandle { - thread: jod_thread::JoinHandle>, + /// The handle to the actual cargo process. As we cannot cancel directly from with + /// a read syscall dropping and therefor terminating the process is our best option. + child: JodChild, + thread: jod_thread::JoinHandle>, receiver: Receiver, } impl CargoHandle { - fn spawn(command: Command) -> CargoHandle { + fn spawn(mut command: Command) -> std::io::Result { + command.stdout(Stdio::piped()).stderr(Stdio::piped()).stdin(Stdio::null()); + let mut child = JodChild::spawn(command)?; + + let stdout = child.stdout.take().unwrap(); + let stderr = child.stderr.take().unwrap(); + let (sender, receiver) = unbounded(); - let actor = CargoActor::new(sender); + let actor = CargoActor::new(sender, stdout, stderr); let thread = jod_thread::Builder::new() .name("CargoHandle".to_owned()) - .spawn(move || actor.run(command)) + .spawn(move || actor.run()) .expect("failed to spawn thread"); - CargoHandle { thread, receiver } + Ok(CargoHandle { child, thread, receiver }) } fn join(self) -> io::Result<()> { - self.thread.join() + let exit_status = self.child.wait()?; + let (read_at_least_one_message, error) = self.thread.join()?; + if read_at_least_one_message || exit_status.success() { + Ok(()) + } else { + Err(io::Error::new(io::ErrorKind::Other, format!( + "Cargo watcher failed, the command produced no valid metadata (exit code: {:?}):\n{}", + exit_status, error + ))) + } } } struct CargoActor { sender: Sender, + stdout: ChildStdout, + stderr: ChildStderr, } impl CargoActor { - fn new(sender: Sender) -> CargoActor { - CargoActor { sender } + fn new(sender: Sender, stdout: ChildStdout, stderr: ChildStderr) -> CargoActor { + CargoActor { sender, stdout, stderr } } - fn run(self, command: Command) -> io::Result<()> { + fn run(self) -> io::Result<(bool, String)> { // We manually read a line at a time, instead of using serde's // stream deserializers, because the deserializer cannot recover // from an error, resulting in it getting stuck, because we try to @@ -292,7 +327,8 @@ fn run(self, command: Command) -> io::Result<()> { let mut error = String::new(); let mut read_at_least_one_message = false; let output = streaming_output( - command, + self.stdout, + self.stderr, &mut |line| { read_at_least_one_message = true; @@ -325,14 +361,7 @@ fn run(self, command: Command) -> io::Result<()> { }, ); match output { - Ok(_) if read_at_least_one_message => Ok(()), - Ok(output) if output.status.success() => Ok(()), - Ok(output) => { - Err(io::Error::new(io::ErrorKind::Other, format!( - "Cargo watcher failed, the command produced no valid metadata (exit code: {:?}):\n{}", - output.status, error - ))) - } + Ok(_) => Ok((read_at_least_one_message, error)), Err(e) => Err(io::Error::new(e.kind(), format!("{:?}: {}", e, error))), } } diff --git a/crates/project-model/src/build_scripts.rs b/crates/project-model/src/build_scripts.rs index c18a8ca1636..ee7f8339a76 100644 --- a/crates/project-model/src/build_scripts.rs +++ b/crates/project-model/src/build_scripts.rs @@ -110,7 +110,7 @@ pub(crate) fn run( }; tracing::info!("Running build scripts: {:?}", cmd); - let output = stdx::process::streaming_output( + let output = stdx::process::spawn_with_streaming_output( cmd, &mut |line| { // Copy-pasted from existing cargo_metadata. It seems like we diff --git a/crates/stdx/src/lib.rs b/crates/stdx/src/lib.rs index 16b8558f411..c52601898d7 100644 --- a/crates/stdx/src/lib.rs +++ b/crates/stdx/src/lib.rs @@ -1,5 +1,6 @@ //! Missing batteries for standard libraries. use std::iter; +use std::process::Command; use std::{cmp::Ordering, ops, time::Instant}; mod macros; @@ -132,6 +133,7 @@ fn drop(&mut self) { D(Some(f)) } +/// A [`std::process::Child`] wrapper that will kill the child on drop. #[cfg_attr(not(target_arch = "wasm32"), repr(transparent))] #[derive(Debug)] pub struct JodChild(pub std::process::Child); @@ -157,6 +159,16 @@ fn drop(&mut self) { } impl JodChild { + pub fn spawn(mut command: Command) -> std::io::Result { + command.spawn().map(Self) + } + + pub fn wait(self) -> std::io::Result { + let mut inner = self.into_inner(); + let _ = inner.kill(); + inner.wait() + } + pub fn into_inner(self) -> std::process::Child { if cfg!(target_arch = "wasm32") { panic!("no processes on wasm"); diff --git a/crates/stdx/src/process.rs b/crates/stdx/src/process.rs index b26b71c9de9..e5aa3436518 100644 --- a/crates/stdx/src/process.rs +++ b/crates/stdx/src/process.rs @@ -5,54 +5,66 @@ use std::{ io, - process::{Command, Output, Stdio}, + process::{ChildStderr, ChildStdout, Command, Output, Stdio}, }; +use crate::JodChild; + pub fn streaming_output( + out: ChildStdout, + err: ChildStderr, + on_stdout_line: &mut dyn FnMut(&str), + on_stderr_line: &mut dyn FnMut(&str), +) -> io::Result<(Vec, Vec)> { + let mut stdout = Vec::new(); + let mut stderr = Vec::new(); + + imp::read2(out, err, &mut |is_out, data, eof| { + let idx = if eof { + data.len() + } else { + match data.iter().rposition(|b| *b == b'\n') { + Some(i) => i + 1, + None => return, + } + }; + { + // scope for new_lines + let new_lines = { + let dst = if is_out { &mut stdout } else { &mut stderr }; + let start = dst.len(); + let data = data.drain(..idx); + dst.extend(data); + &dst[start..] + }; + for line in String::from_utf8_lossy(new_lines).lines() { + if is_out { + on_stdout_line(line); + } else { + on_stderr_line(line); + } + } + } + })?; + + Ok((stdout, stderr)) +} + +pub fn spawn_with_streaming_output( mut cmd: Command, on_stdout_line: &mut dyn FnMut(&str), on_stderr_line: &mut dyn FnMut(&str), ) -> io::Result { - let mut stdout = Vec::new(); - let mut stderr = Vec::new(); - let cmd = cmd.stdout(Stdio::piped()).stderr(Stdio::piped()).stdin(Stdio::null()); - let status = { - let mut child = cmd.spawn()?; - let out = child.stdout.take().unwrap(); - let err = child.stderr.take().unwrap(); - imp::read2(out, err, &mut |is_out, data, eof| { - let idx = if eof { - data.len() - } else { - match data.iter().rposition(|b| *b == b'\n') { - Some(i) => i + 1, - None => return, - } - }; - { - // scope for new_lines - let new_lines = { - let dst = if is_out { &mut stdout } else { &mut stderr }; - let start = dst.len(); - let data = data.drain(..idx); - dst.extend(data); - &dst[start..] - }; - for line in String::from_utf8_lossy(new_lines).lines() { - if is_out { - on_stdout_line(line); - } else { - on_stderr_line(line); - } - } - } - })?; - let _ = child.kill(); - child.wait()? - }; - + let mut child = JodChild(cmd.spawn()?); + let (stdout, stderr) = streaming_output( + child.stdout.take().unwrap(), + child.stderr.take().unwrap(), + on_stdout_line, + on_stderr_line, + )?; + let status = child.wait()?; Ok(Output { status, stdout, stderr }) }