internal: Bring back JodChild into flychecking for cancellation

This commit is contained in:
Lukas Wirth 2022-06-13 13:34:07 +02:00
parent 7db73875ac
commit 6438ef9aa3
4 changed files with 117 additions and 64 deletions

View File

@ -2,12 +2,16 @@
//! another compatible command (f.x. clippy) in a background thread and provide //! another compatible command (f.x. clippy) in a background thread and provide
//! LSP diagnostics based on the output of the command. //! LSP diagnostics based on the output of the command.
use std::{fmt, io, process::Command, time::Duration}; use std::{
fmt, io,
process::{ChildStderr, ChildStdout, Command, Stdio},
time::Duration,
};
use crossbeam_channel::{never, select, unbounded, Receiver, Sender}; use crossbeam_channel::{never, select, unbounded, Receiver, Sender};
use paths::AbsPathBuf; use paths::AbsPathBuf;
use serde::Deserialize; use serde::Deserialize;
use stdx::process::streaming_output; use stdx::{process::streaming_output, JodChild};
pub use cargo_metadata::diagnostic::{ pub use cargo_metadata::diagnostic::{
Applicability, Diagnostic, DiagnosticCode, DiagnosticLevel, DiagnosticSpan, Applicability, Diagnostic, DiagnosticCode, DiagnosticLevel, DiagnosticSpan,
@ -117,7 +121,7 @@ struct FlycheckActor {
sender: Box<dyn Fn(Message) + Send>, sender: Box<dyn Fn(Message) + Send>,
config: FlycheckConfig, config: FlycheckConfig,
workspace_root: AbsPathBuf, workspace_root: AbsPathBuf,
/// WatchThread exists to wrap around the communication needed to be able to /// CargoHandle exists to wrap around the communication needed to be able to
/// run `cargo check` without blocking. Currently the Rust standard library /// run `cargo check` without blocking. Currently the Rust standard library
/// doesn't provide a way to read sub-process output without blocking, so we /// doesn't provide a way to read sub-process output without blocking, so we
/// have to wrap sub-processes output handling in a thread and pass messages /// have to wrap sub-processes output handling in a thread and pass messages
@ -153,14 +157,24 @@ fn run(mut self, inbox: Receiver<Restart>) {
while let Some(event) = self.next_event(&inbox) { while let Some(event) = self.next_event(&inbox) {
match event { match event {
Event::Restart(Restart) => { Event::Restart(Restart) => {
// Drop and cancel the previously spawned process
self.cargo_handle.take();
while let Ok(Restart) = inbox.recv_timeout(Duration::from_millis(50)) {} while let Ok(Restart) = inbox.recv_timeout(Duration::from_millis(50)) {}
self.cancel_check_process(); self.cancel_check_process();
let command = self.check_command(); let command = self.check_command();
tracing::info!("restart flycheck {:?}", command); let command_f = format!("restart flycheck {command:?}");
self.cargo_handle = Some(CargoHandle::spawn(command)); match CargoHandle::spawn(command) {
self.progress(Progress::DidStart); Ok(cargo_handle) => {
tracing::info!("{}", command_f);
self.cargo_handle = Some(cargo_handle);
self.progress(Progress::DidStart);
}
Err(e) => {
tracing::error!("{command_f} failed: {e:?}",);
}
}
} }
Event::CheckEvent(None) => { Event::CheckEvent(None) => {
// Watcher finished, replace it with a never channel to // Watcher finished, replace it with a never channel to
@ -249,37 +263,58 @@ fn send(&self, check_task: Message) {
} }
} }
/// A handle to a cargo process used for fly-checking.
struct CargoHandle { struct CargoHandle {
thread: jod_thread::JoinHandle<io::Result<()>>, /// The handle to the actual cargo process. As we cannot cancel directly from with
/// a read syscall dropping and therefor terminating the process is our best option.
child: JodChild,
thread: jod_thread::JoinHandle<io::Result<(bool, String)>>,
receiver: Receiver<CargoMessage>, receiver: Receiver<CargoMessage>,
} }
impl CargoHandle { impl CargoHandle {
fn spawn(command: Command) -> CargoHandle { fn spawn(mut command: Command) -> std::io::Result<CargoHandle> {
command.stdout(Stdio::piped()).stderr(Stdio::piped()).stdin(Stdio::null());
let mut child = JodChild::spawn(command)?;
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let (sender, receiver) = unbounded(); let (sender, receiver) = unbounded();
let actor = CargoActor::new(sender); let actor = CargoActor::new(sender, stdout, stderr);
let thread = jod_thread::Builder::new() let thread = jod_thread::Builder::new()
.name("CargoHandle".to_owned()) .name("CargoHandle".to_owned())
.spawn(move || actor.run(command)) .spawn(move || actor.run())
.expect("failed to spawn thread"); .expect("failed to spawn thread");
CargoHandle { thread, receiver } Ok(CargoHandle { child, thread, receiver })
} }
fn join(self) -> io::Result<()> { fn join(self) -> io::Result<()> {
self.thread.join() let exit_status = self.child.wait()?;
let (read_at_least_one_message, error) = self.thread.join()?;
if read_at_least_one_message || exit_status.success() {
Ok(())
} else {
Err(io::Error::new(io::ErrorKind::Other, format!(
"Cargo watcher failed, the command produced no valid metadata (exit code: {:?}):\n{}",
exit_status, error
)))
}
} }
} }
struct CargoActor { struct CargoActor {
sender: Sender<CargoMessage>, sender: Sender<CargoMessage>,
stdout: ChildStdout,
stderr: ChildStderr,
} }
impl CargoActor { impl CargoActor {
fn new(sender: Sender<CargoMessage>) -> CargoActor { fn new(sender: Sender<CargoMessage>, stdout: ChildStdout, stderr: ChildStderr) -> CargoActor {
CargoActor { sender } CargoActor { sender, stdout, stderr }
} }
fn run(self, command: Command) -> io::Result<()> { fn run(self) -> io::Result<(bool, String)> {
// We manually read a line at a time, instead of using serde's // We manually read a line at a time, instead of using serde's
// stream deserializers, because the deserializer cannot recover // stream deserializers, because the deserializer cannot recover
// from an error, resulting in it getting stuck, because we try to // from an error, resulting in it getting stuck, because we try to
@ -292,7 +327,8 @@ fn run(self, command: Command) -> io::Result<()> {
let mut error = String::new(); let mut error = String::new();
let mut read_at_least_one_message = false; let mut read_at_least_one_message = false;
let output = streaming_output( let output = streaming_output(
command, self.stdout,
self.stderr,
&mut |line| { &mut |line| {
read_at_least_one_message = true; read_at_least_one_message = true;
@ -325,14 +361,7 @@ fn run(self, command: Command) -> io::Result<()> {
}, },
); );
match output { match output {
Ok(_) if read_at_least_one_message => Ok(()), Ok(_) => Ok((read_at_least_one_message, error)),
Ok(output) if output.status.success() => Ok(()),
Ok(output) => {
Err(io::Error::new(io::ErrorKind::Other, format!(
"Cargo watcher failed, the command produced no valid metadata (exit code: {:?}):\n{}",
output.status, error
)))
}
Err(e) => Err(io::Error::new(e.kind(), format!("{:?}: {}", e, error))), Err(e) => Err(io::Error::new(e.kind(), format!("{:?}: {}", e, error))),
} }
} }

View File

@ -110,7 +110,7 @@ pub(crate) fn run(
}; };
tracing::info!("Running build scripts: {:?}", cmd); tracing::info!("Running build scripts: {:?}", cmd);
let output = stdx::process::streaming_output( let output = stdx::process::spawn_with_streaming_output(
cmd, cmd,
&mut |line| { &mut |line| {
// Copy-pasted from existing cargo_metadata. It seems like we // Copy-pasted from existing cargo_metadata. It seems like we

View File

@ -1,5 +1,6 @@
//! Missing batteries for standard libraries. //! Missing batteries for standard libraries.
use std::iter; use std::iter;
use std::process::Command;
use std::{cmp::Ordering, ops, time::Instant}; use std::{cmp::Ordering, ops, time::Instant};
mod macros; mod macros;
@ -132,6 +133,7 @@ fn drop(&mut self) {
D(Some(f)) D(Some(f))
} }
/// A [`std::process::Child`] wrapper that will kill the child on drop.
#[cfg_attr(not(target_arch = "wasm32"), repr(transparent))] #[cfg_attr(not(target_arch = "wasm32"), repr(transparent))]
#[derive(Debug)] #[derive(Debug)]
pub struct JodChild(pub std::process::Child); pub struct JodChild(pub std::process::Child);
@ -157,6 +159,16 @@ fn drop(&mut self) {
} }
impl JodChild { impl JodChild {
pub fn spawn(mut command: Command) -> std::io::Result<Self> {
command.spawn().map(Self)
}
pub fn wait(self) -> std::io::Result<std::process::ExitStatus> {
let mut inner = self.into_inner();
let _ = inner.kill();
inner.wait()
}
pub fn into_inner(self) -> std::process::Child { pub fn into_inner(self) -> std::process::Child {
if cfg!(target_arch = "wasm32") { if cfg!(target_arch = "wasm32") {
panic!("no processes on wasm"); panic!("no processes on wasm");

View File

@ -5,54 +5,66 @@
use std::{ use std::{
io, io,
process::{Command, Output, Stdio}, process::{ChildStderr, ChildStdout, Command, Output, Stdio},
}; };
use crate::JodChild;
pub fn streaming_output( pub fn streaming_output(
out: ChildStdout,
err: ChildStderr,
on_stdout_line: &mut dyn FnMut(&str),
on_stderr_line: &mut dyn FnMut(&str),
) -> io::Result<(Vec<u8>, Vec<u8>)> {
let mut stdout = Vec::new();
let mut stderr = Vec::new();
imp::read2(out, err, &mut |is_out, data, eof| {
let idx = if eof {
data.len()
} else {
match data.iter().rposition(|b| *b == b'\n') {
Some(i) => i + 1,
None => return,
}
};
{
// scope for new_lines
let new_lines = {
let dst = if is_out { &mut stdout } else { &mut stderr };
let start = dst.len();
let data = data.drain(..idx);
dst.extend(data);
&dst[start..]
};
for line in String::from_utf8_lossy(new_lines).lines() {
if is_out {
on_stdout_line(line);
} else {
on_stderr_line(line);
}
}
}
})?;
Ok((stdout, stderr))
}
pub fn spawn_with_streaming_output(
mut cmd: Command, mut cmd: Command,
on_stdout_line: &mut dyn FnMut(&str), on_stdout_line: &mut dyn FnMut(&str),
on_stderr_line: &mut dyn FnMut(&str), on_stderr_line: &mut dyn FnMut(&str),
) -> io::Result<Output> { ) -> io::Result<Output> {
let mut stdout = Vec::new();
let mut stderr = Vec::new();
let cmd = cmd.stdout(Stdio::piped()).stderr(Stdio::piped()).stdin(Stdio::null()); let cmd = cmd.stdout(Stdio::piped()).stderr(Stdio::piped()).stdin(Stdio::null());
let status = { let mut child = JodChild(cmd.spawn()?);
let mut child = cmd.spawn()?; let (stdout, stderr) = streaming_output(
let out = child.stdout.take().unwrap(); child.stdout.take().unwrap(),
let err = child.stderr.take().unwrap(); child.stderr.take().unwrap(),
imp::read2(out, err, &mut |is_out, data, eof| { on_stdout_line,
let idx = if eof { on_stderr_line,
data.len() )?;
} else { let status = child.wait()?;
match data.iter().rposition(|b| *b == b'\n') {
Some(i) => i + 1,
None => return,
}
};
{
// scope for new_lines
let new_lines = {
let dst = if is_out { &mut stdout } else { &mut stderr };
let start = dst.len();
let data = data.drain(..idx);
dst.extend(data);
&dst[start..]
};
for line in String::from_utf8_lossy(new_lines).lines() {
if is_out {
on_stdout_line(line);
} else {
on_stderr_line(line);
}
}
}
})?;
let _ = child.kill();
child.wait()?
};
Ok(Output { status, stdout, stderr }) Ok(Output { status, stdout, stderr })
} }