2853: Manage `cargo check` state updates in `main_loop` to reduce lock contention r=matklad a=kiljacken

State is now updated exclusively from `main_loop` so several threads theoretically can't compete for the lock. Updates to the state are requested via the existing task channel.

Also updates some naming to make slightly more sense.

Based upon an idea/suggestion from @matklad on Zulip:

> I think I've noticed at leas something suspicious!
> 
> In WorldSnapshot, we store an Arc<RwLock<CheckWatcherSharedState>>. We read lock this lock in handle_diagnostics.
> 
> Additionally, we .write this lock from the watcher thread in CheckWatcherState::run.
> 
> I think in general this is less then ideal, b/c diagnostics request can be blocked on another thread. I think it makes sense to architect this in a way which does not block.
>
> For that, we stop sharing the state between ServerWorld and CheckWatcherState. Instead, the watcher thread sends new diagnostics via a channel, and we accomodate thouse diagnostics intot he server state in the main loop.
> 
> So, instead of:
> ```rust
> struct Server {
>     diagnostics: Arc<Mutex<Vec<Diagnostics>>>,
> }
> 
> struct Watcher {
>     diagnostics: Arc<Mutex<Vec<Diagnostics>>>,
> }
> ```
> we'll have something like this:
> ```rust
> struct Server {
>     // this bit now *owns* diagnostics
>     diagnostisc: Vec<Diagnostics>
> }
> 
> struct Watcher {
>     diagnostics_sink: Sender<Vec<Diagnostics>>,
> }
> ```
> I am not sure this is the cuprit of slowness on widnows, but I think we should fix it, because it's very useful when all changes to the server's state can occur only via the main loop.
> 
> Note how VFS is set up in a similar way: instead of modifing some global hash map with files, VFS sends a message to the main looop that hey, I have these new files for you. The main loop than incorporates the changes itself.
> 
> Note that I think we'll still need some locks here, to share the state between ServerWorld and WorldSnapshot, but we won't actually be changing anyting mid-snapshot


Co-authored-by: Emil Lauridsen <mine809@gmail.com>
This commit is contained in:
bors[bot] 2020-01-15 15:37:28 +00:00 committed by GitHub
commit c0661ce744
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 97 additions and 68 deletions

View File

@ -38,7 +38,7 @@ pub struct CheckOptions {
#[derive(Debug)]
pub struct CheckWatcher {
pub task_recv: Receiver<CheckTask>,
pub shared: Arc<RwLock<CheckWatcherSharedState>>,
pub state: Arc<RwLock<CheckState>>,
cmd_send: Option<Sender<CheckCommand>>,
handle: Option<JoinHandle<()>>,
}
@ -46,22 +46,21 @@ pub struct CheckWatcher {
impl CheckWatcher {
pub fn new(options: &CheckOptions, workspace_root: PathBuf) -> CheckWatcher {
let options = options.clone();
let shared = Arc::new(RwLock::new(CheckWatcherSharedState::new()));
let state = Arc::new(RwLock::new(CheckState::new()));
let (task_send, task_recv) = unbounded::<CheckTask>();
let (cmd_send, cmd_recv) = unbounded::<CheckCommand>();
let shared_ = shared.clone();
let handle = std::thread::spawn(move || {
let mut check = CheckWatcherState::new(options, workspace_root, shared_);
let mut check = CheckWatcherThread::new(options, workspace_root);
check.run(&task_send, &cmd_recv);
});
CheckWatcher { task_recv, cmd_send: Some(cmd_send), handle: Some(handle), shared }
CheckWatcher { task_recv, cmd_send: Some(cmd_send), handle: Some(handle), state }
}
/// Returns a CheckWatcher that doesn't actually do anything
pub fn dummy() -> CheckWatcher {
let shared = Arc::new(RwLock::new(CheckWatcherSharedState::new()));
CheckWatcher { task_recv: never(), cmd_send: None, handle: None, shared }
let state = Arc::new(RwLock::new(CheckState::new()));
CheckWatcher { task_recv: never(), cmd_send: None, handle: None, state }
}
/// Schedule a re-start of the cargo check worker.
@ -89,14 +88,14 @@ impl std::ops::Drop for CheckWatcher {
}
#[derive(Debug)]
pub struct CheckWatcherSharedState {
pub struct CheckState {
diagnostic_collection: HashMap<Url, Vec<Diagnostic>>,
suggested_fix_collection: HashMap<Url, Vec<SuggestedFix>>,
}
impl CheckWatcherSharedState {
fn new() -> CheckWatcherSharedState {
CheckWatcherSharedState {
impl CheckState {
fn new() -> CheckState {
CheckState {
diagnostic_collection: HashMap::new(),
suggested_fix_collection: HashMap::new(),
}
@ -104,15 +103,11 @@ impl CheckWatcherSharedState {
/// Clear the cached diagnostics, and schedule updating diagnostics by the
/// server, to clear stale results.
pub fn clear(&mut self, task_send: &Sender<CheckTask>) {
pub fn clear(&mut self) -> Vec<Url> {
let cleared_files: Vec<Url> = self.diagnostic_collection.keys().cloned().collect();
self.diagnostic_collection.clear();
self.suggested_fix_collection.clear();
for uri in cleared_files {
task_send.send(CheckTask::Update(uri.clone())).unwrap();
}
cleared_files
}
pub fn diagnostics_for(&self, uri: &Url) -> Option<&[Diagnostic]> {
@ -123,6 +118,13 @@ impl CheckWatcherSharedState {
self.suggested_fix_collection.get(uri).map(|d| d.as_slice())
}
pub fn add_diagnostic_with_fixes(&mut self, file_uri: Url, diagnostic: DiagnosticWithFixes) {
for fix in diagnostic.suggested_fixes {
self.add_suggested_fix_for_diagnostic(fix, &diagnostic.diagnostic);
}
self.add_diagnostic(file_uri, diagnostic.diagnostic);
}
fn add_diagnostic(&mut self, file_uri: Url, diagnostic: Diagnostic) {
let diagnostics = self.diagnostic_collection.entry(file_uri).or_default();
@ -158,8 +160,11 @@ impl CheckWatcherSharedState {
#[derive(Debug)]
pub enum CheckTask {
/// Request a update of the given files diagnostics
Update(Url),
/// Request a clearing of all cached diagnostics from the check watcher
ClearDiagnostics,
/// Request adding a diagnostic with fixes included to a file
AddDiagnostic(Url, DiagnosticWithFixes),
/// Request check progress notification to client
Status(WorkDoneProgress),
@ -170,26 +175,20 @@ pub enum CheckCommand {
Update,
}
struct CheckWatcherState {
struct CheckWatcherThread {
options: CheckOptions,
workspace_root: PathBuf,
watcher: WatchThread,
last_update_req: Option<Instant>,
shared: Arc<RwLock<CheckWatcherSharedState>>,
}
impl CheckWatcherState {
fn new(
options: CheckOptions,
workspace_root: PathBuf,
shared: Arc<RwLock<CheckWatcherSharedState>>,
) -> CheckWatcherState {
CheckWatcherState {
impl CheckWatcherThread {
fn new(options: CheckOptions, workspace_root: PathBuf) -> CheckWatcherThread {
CheckWatcherThread {
options,
workspace_root,
watcher: WatchThread::dummy(),
last_update_req: None,
shared,
}
}
@ -215,7 +214,7 @@ impl CheckWatcherState {
if self.should_recheck() {
self.last_update_req.take();
self.shared.write().clear(task_send);
task_send.send(CheckTask::ClearDiagnostics).unwrap();
// By replacing the watcher, we drop the previous one which
// causes it to shut down automatically.
@ -240,7 +239,7 @@ impl CheckWatcherState {
}
}
fn handle_message(&mut self, msg: CheckEvent, task_send: &Sender<CheckTask>) {
fn handle_message(&self, msg: CheckEvent, task_send: &Sender<CheckTask>) {
match msg {
CheckEvent::Begin => {
task_send
@ -279,18 +278,9 @@ impl CheckWatcherState {
};
let MappedRustDiagnostic { location, diagnostic, suggested_fixes } = map_result;
let file_uri = location.uri.clone();
if !suggested_fixes.is_empty() {
for suggested_fix in suggested_fixes {
self.shared
.write()
.add_suggested_fix_for_diagnostic(suggested_fix, &diagnostic);
}
}
self.shared.write().add_diagnostic(file_uri, diagnostic);
task_send.send(CheckTask::Update(location.uri)).unwrap();
let diagnostic = DiagnosticWithFixes { diagnostic, suggested_fixes };
task_send.send(CheckTask::AddDiagnostic(location.uri, diagnostic)).unwrap();
}
CheckEvent::Msg(Message::BuildScriptExecuted(_msg)) => {}
@ -299,6 +289,12 @@ impl CheckWatcherState {
}
}
#[derive(Debug)]
pub struct DiagnosticWithFixes {
diagnostic: Diagnostic,
suggested_fixes: Vec<SuggestedFix>,
}
/// WatchThread exists to wrap around the communication needed to be able to
/// run `cargo check` without blocking. Currently the Rust standard library
/// doesn't provide a way to read sub-process output without blocking, so we

View File

@ -9,7 +9,7 @@ use std::{error::Error, fmt, panic, path::PathBuf, sync::Arc, time::Instant};
use crossbeam_channel::{select, unbounded, RecvError, Sender};
use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response};
use lsp_types::{ClientCapabilities, NumberOrString};
use lsp_types::{ClientCapabilities, NumberOrString, Url};
use ra_cargo_watch::{CheckOptions, CheckTask};
use ra_ide::{Canceled, FeatureFlags, FileId, LibraryData, SourceRootId};
use ra_prof::profile;
@ -336,28 +336,7 @@ fn loop_turn(
world_state.maybe_collect_garbage();
loop_state.in_flight_libraries -= 1;
}
Event::CheckWatcher(task) => match task {
CheckTask::Update(uri) => {
// We manually send a diagnostic update when the watcher asks
// us to, to avoid the issue of having to change the file to
// receive updated diagnostics.
let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?;
if let Some(file_id) = world_state.vfs.read().path2file(&path) {
let params =
handlers::publish_diagnostics(&world_state.snapshot(), FileId(file_id.0))?;
let not = notification_new::<req::PublishDiagnostics>(params);
task_sender.send(Task::Notify(not)).unwrap();
}
}
CheckTask::Status(progress) => {
let params = req::ProgressParams {
token: req::ProgressToken::String("rustAnalyzer/cargoWatcher".to_string()),
value: req::ProgressParamsValue::WorkDone(progress),
};
let not = notification_new::<req::Progress>(params);
task_sender.send(Task::Notify(not)).unwrap();
}
},
Event::CheckWatcher(task) => on_check_task(task, world_state, task_sender)?,
Event::Msg(msg) => match msg {
Message::Request(req) => on_request(
world_state,
@ -605,6 +584,60 @@ fn on_notification(
Ok(())
}
fn on_check_task(
task: CheckTask,
world_state: &WorldState,
task_sender: &Sender<Task>,
) -> Result<()> {
match task {
CheckTask::ClearDiagnostics => {
let cleared_files = world_state.check_watcher.state.write().clear();
// Send updated diagnostics for each cleared file
for url in cleared_files {
publish_diagnostics_for_url(&url, world_state, task_sender)?;
}
}
CheckTask::AddDiagnostic(url, diagnostic) => {
world_state
.check_watcher
.state
.write()
.add_diagnostic_with_fixes(url.clone(), diagnostic);
// We manually send a diagnostic update when the watcher asks
// us to, to avoid the issue of having to change the file to
// receive updated diagnostics.
publish_diagnostics_for_url(&url, world_state, task_sender)?;
}
CheckTask::Status(progress) => {
let params = req::ProgressParams {
token: req::ProgressToken::String("rustAnalyzer/cargoWatcher".to_string()),
value: req::ProgressParamsValue::WorkDone(progress),
};
let not = notification_new::<req::Progress>(params);
task_sender.send(Task::Notify(not)).unwrap();
}
}
Ok(())
}
fn publish_diagnostics_for_url(
url: &Url,
world_state: &WorldState,
task_sender: &Sender<Task>,
) -> Result<()> {
let path = url.to_file_path().map_err(|()| format!("invalid uri: {}", url))?;
if let Some(file_id) = world_state.vfs.read().path2file(&path) {
let params = handlers::publish_diagnostics(&world_state.snapshot(), FileId(file_id.0))?;
let not = notification_new::<req::PublishDiagnostics>(params);
task_sender.send(Task::Notify(not)).unwrap();
}
Ok(())
}
struct PoolDispatcher<'a> {
req: Option<Request>,
pool: &'a ThreadPool,

View File

@ -13,7 +13,7 @@ use lsp_server::ErrorCode;
use lsp_types::Url;
use parking_lot::RwLock;
use ra_cargo_watch::{
url_from_path_with_drive_lowercasing, CheckOptions, CheckWatcher, CheckWatcherSharedState,
url_from_path_with_drive_lowercasing, CheckOptions, CheckState, CheckWatcher,
};
use ra_ide::{
Analysis, AnalysisChange, AnalysisHost, CrateGraph, FeatureFlags, FileId, LibraryData,
@ -64,7 +64,7 @@ pub struct WorldSnapshot {
pub analysis: Analysis,
pub vfs: Arc<RwLock<Vfs>>,
pub latest_requests: Arc<RwLock<LatestRequests>>,
pub check_watcher: Arc<RwLock<CheckWatcherSharedState>>,
pub check_watcher: Arc<RwLock<CheckState>>,
}
impl WorldState {
@ -220,7 +220,7 @@ impl WorldState {
analysis: self.analysis_host.analysis(),
vfs: Arc::clone(&self.vfs),
latest_requests: Arc::clone(&self.latest_requests),
check_watcher: self.check_watcher.shared.clone(),
check_watcher: self.check_watcher.state.clone(),
}
}