Consistent, hopefully robust, shutdown/cancelation story for cargo check subprocess

This commit is contained in:
Emil Lauridsen 2019-12-27 11:31:25 +01:00
parent 428a6ff5b8
commit a2d10694cc

View File

@ -2,7 +2,7 @@
//! another compatible command (f.x. clippy) in a background thread and provide
//! LSP diagnostics based on the output of the command.
use cargo_metadata::Message;
use crossbeam_channel::{select, unbounded, Receiver, RecvError, Sender, TryRecvError};
use crossbeam_channel::{select, unbounded, Receiver, RecvError, Sender};
use lsp_types::{
Diagnostic, Url, WorkDoneProgress, WorkDoneProgressBegin, WorkDoneProgressEnd,
WorkDoneProgressReport,
@ -191,7 +191,8 @@ pub fn run(&mut self, task_send: &Sender<CheckTask>, cmd_recv: &Receiver<CheckCo
self.last_update_req.take();
self.shared.write().clear(task_send);
self.watcher.cancel();
// By replacing the watcher, we drop the previous one which
// causes it to shut down automatically.
self.watcher = WatchThread::new(&self.options, &self.workspace_root);
}
}
@ -277,9 +278,11 @@ fn handle_message(&mut self, msg: CheckEvent, task_send: &Sender<CheckTask>) {
/// doesn't provide a way to read sub-process output without blocking, so we
/// have to wrap sub-processes output handling in a thread and pass messages
/// back over a channel.
/// The correct way to dispose of the thread is to drop it, on which the
/// sub-process will be killed, and the thread will be joined.
struct WatchThread {
handle: Option<JoinHandle<()>>,
message_recv: Receiver<CheckEvent>,
cancel_send: Sender<()>,
}
enum CheckEvent {
@ -302,9 +305,8 @@ fn new(options: &CheckOptions, workspace_root: &PathBuf) -> WatchThread {
args.extend(options.args.iter().cloned());
let (message_send, message_recv) = unbounded();
let (cancel_send, cancel_recv) = unbounded();
let enabled = options.enable;
std::thread::spawn(move || {
let handle = std::thread::spawn(move || {
if !enabled {
return;
}
@ -316,24 +318,56 @@ fn new(options: &CheckOptions, workspace_root: &PathBuf) -> WatchThread {
.spawn()
.expect("couldn't launch cargo");
message_send.send(CheckEvent::Begin).unwrap();
// If we trigger an error here, we will do so in the loop instead,
// which will break out of the loop, and continue the shutdown
let _ = message_send.send(CheckEvent::Begin);
for message in cargo_metadata::parse_messages(command.stdout.take().unwrap()) {
match cancel_recv.try_recv() {
Ok(()) | Err(TryRecvError::Disconnected) => {
command.kill().expect("couldn't kill command");
let message = match message {
Ok(message) => message,
Err(err) => {
log::error!("Invalid json from cargo check, ignoring: {}", err);
continue;
}
};
match message_send.send(CheckEvent::Msg(message)) {
Ok(()) => {}
Err(_err) => {
// The send channel was closed, so we want to shutdown
break;
}
Err(TryRecvError::Empty) => (),
}
message_send.send(CheckEvent::Msg(message.unwrap())).unwrap();
}
message_send.send(CheckEvent::End).unwrap();
});
WatchThread { message_recv, cancel_send }
}
fn cancel(&self) {
let _ = self.cancel_send.send(());
// We can ignore any error here, as we are already in the progress
// of shutting down.
let _ = message_send.send(CheckEvent::End);
// It is okay to ignore the result, as it only errors if the process is already dead
let _ = command.kill();
// Again, we don't care about the exit status so just ignore the result
let _ = command.wait();
});
WatchThread { handle: Some(handle), message_recv }
}
}
impl std::ops::Drop for WatchThread {
fn drop(&mut self) {
if let Some(handle) = self.handle.take() {
// Replace our reciever with dummy one, so we can drop and close the
// one actually communicating with the thread
let recv = std::mem::replace(&mut self.message_recv, crossbeam_channel::never());
// Dropping the original reciever initiates thread sub-process shutdown
drop(recv);
// Join the thread, it should finish shortly. We don't really care
// whether it panicked, so it is safe to ignore the result
let _ = handle.join();
}
}
}