Merge LoopState into GlobalState

This commit is contained in:
Aleksey Kladov 2020-06-25 00:17:11 +02:00
parent bff7307b8c
commit 19b063e055
2 changed files with 64 additions and 79 deletions

View File

@ -20,11 +20,12 @@ use crate::{
diagnostics::{CheckFixes, DiagnosticCollection},
from_proto,
line_endings::LineEndings,
main_loop::ReqQueue,
request_metrics::{LatestRequests, RequestMetrics},
to_proto::url_from_abs_path,
Result,
};
use rustc_hash::FxHashMap;
use rustc_hash::{FxHashMap, FxHashSet};
fn create_flycheck(workspaces: &[ProjectWorkspace], config: &FlycheckConfig) -> Option<Flycheck> {
// FIXME: Figure out the multi-workspace situation
@ -40,12 +41,23 @@ fn create_flycheck(workspaces: &[ProjectWorkspace], config: &FlycheckConfig) ->
})
}
#[derive(Eq, PartialEq)]
pub(crate) enum Status {
Loading,
Ready,
}
impl Default for Status {
fn default() -> Self {
Status::Loading
}
}
/// `GlobalState` is the primary mutable state of the language server
///
/// The most interesting components are `vfs`, which stores a consistent
/// snapshot of the file systems, and `analysis_host`, which stores our
/// incremental salsa database.
#[derive(Debug)]
pub(crate) struct GlobalState {
pub(crate) config: Config,
pub(crate) workspaces: Arc<Vec<ProjectWorkspace>>,
@ -54,10 +66,13 @@ pub(crate) struct GlobalState {
pub(crate) task_receiver: Receiver<vfs::loader::Message>,
pub(crate) flycheck: Option<Flycheck>,
pub(crate) diagnostics: DiagnosticCollection,
pub(crate) proc_macro_client: ProcMacroClient,
pub(crate) mem_docs: FxHashSet<VfsPath>,
pub(crate) vfs: Arc<RwLock<(vfs::Vfs, FxHashMap<FileId, LineEndings>)>>,
pub(crate) status: Status,
pub(crate) req_queue: ReqQueue,
pub(crate) latest_requests: Arc<RwLock<LatestRequests>>,
source_root_config: SourceRootConfig,
_proc_macro_client: ProcMacroClient,
}
/// An immutable snapshot of the world's state at a point in time.
@ -75,6 +90,7 @@ impl GlobalState {
workspaces: Vec<ProjectWorkspace>,
lru_capacity: Option<usize>,
config: Config,
req_queue: ReqQueue,
) -> GlobalState {
let mut change = AnalysisChange::new();
@ -136,13 +152,16 @@ impl GlobalState {
workspaces: Arc::new(workspaces),
analysis_host,
loader,
vfs: Arc::new(RwLock::new((vfs, FxHashMap::default()))),
task_receiver,
latest_requests: Default::default(),
flycheck,
diagnostics: Default::default(),
proc_macro_client,
mem_docs: FxHashSet::default(),
vfs: Arc::new(RwLock::new((vfs, FxHashMap::default()))),
status: Status::default(),
req_queue,
latest_requests: Default::default(),
source_root_config: project_folders.source_root_config,
_proc_macro_client: proc_macro_client,
};
res.process_changes();
res

View File

@ -11,16 +11,13 @@ use std::{
};
use crossbeam_channel::{never, select, unbounded, RecvError, Sender};
use lsp_server::{
Connection, ErrorCode, Message, Notification, ReqQueue, Request, RequestId, Response,
};
use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response};
use lsp_types::{request::Request as _, NumberOrString, TextDocumentContentChangeEvent};
use ra_db::VfsPath;
use ra_flycheck::CheckTask;
use ra_ide::{Canceled, FileId, LineIndex};
use ra_prof::profile;
use ra_project_model::{PackageRoot, ProjectWorkspace};
use rustc_hash::FxHashSet;
use serde::{de::DeserializeOwned, Serialize};
use threadpool::ThreadPool;
@ -28,7 +25,7 @@ use crate::{
config::{Config, FilesWatcher, LinkedProject},
diagnostics::DiagnosticTask,
from_proto,
global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot},
global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot, Status},
handlers, lsp_ext,
request_metrics::RequestMetrics,
Result,
@ -78,7 +75,6 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
SetThreadPriority(thread, thread_priority_above_normal);
}
let mut loop_state = LoopState::default();
let mut global_state = {
let workspaces = {
if config.linked_projects.is_empty() && config.notifications.cargo_toml_not_found {
@ -116,6 +112,8 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
.collect::<Vec<_>>()
};
let mut req_queue = ReqQueue::default();
if let FilesWatcher::Client = config.files.watcher {
let registration_options = lsp_types::DidChangeWatchedFilesRegistrationOptions {
watchers: workspaces
@ -132,7 +130,7 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
register_options: Some(serde_json::to_value(registration_options).unwrap()),
};
let params = lsp_types::RegistrationParams { registrations: vec![registration] };
let request = loop_state.req_queue.outgoing.register(
let request = req_queue.outgoing.register(
lsp_types::request::RegisterCapability::METHOD.to_string(),
params,
DO_NOTHING,
@ -140,7 +138,7 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
connection.sender.send(request.into()).unwrap();
}
GlobalState::new(workspaces, config.lru_capacity, config)
GlobalState::new(workspaces, config.lru_capacity, config, req_queue)
};
let pool = ThreadPool::default();
@ -172,15 +170,13 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
};
}
assert!(!global_state.vfs.read().0.has_changes());
loop_turn(&pool, &task_sender, &connection, &mut global_state, &mut loop_state, event)?;
loop_turn(&pool, &task_sender, &connection, &mut global_state, event)?;
assert!(!global_state.vfs.read().0.has_changes());
}
}
global_state.analysis_host.request_cancellation();
log::info!("waiting for tasks to finish...");
task_receiver.into_iter().for_each(|task| {
on_task(task, &connection.sender, &mut loop_state.req_queue.incoming, &mut global_state)
});
task_receiver.into_iter().for_each(|task| on_task(task, &connection.sender, &mut global_state));
log::info!("...tasks have finished");
log::info!("joining threadpool...");
pool.join();
@ -244,35 +240,15 @@ impl fmt::Debug for Event {
}
}
type ReqHandler = fn(&mut GlobalState, Response);
pub(crate) type ReqHandler = fn(&mut GlobalState, Response);
pub(crate) type ReqQueue = lsp_server::ReqQueue<(&'static str, Instant), ReqHandler>;
const DO_NOTHING: ReqHandler = |_, _| ();
type Incoming = lsp_server::Incoming<(&'static str, Instant)>;
#[derive(Default)]
struct LoopState {
req_queue: ReqQueue<(&'static str, Instant), ReqHandler>,
mem_docs: FxHashSet<VfsPath>,
status: Status,
}
#[derive(Eq, PartialEq)]
enum Status {
Loading,
Ready,
}
impl Default for Status {
fn default() -> Self {
Status::Loading
}
}
fn loop_turn(
pool: &ThreadPool,
task_sender: &Sender<Task>,
connection: &Connection,
global_state: &mut GlobalState,
loop_state: &mut LoopState,
event: Event,
) -> Result<()> {
let loop_start = Instant::now();
@ -288,7 +264,7 @@ fn loop_turn(
let mut became_ready = false;
match event {
Event::Task(task) => {
on_task(task, &connection.sender, &mut loop_state.req_queue.incoming, global_state);
on_task(task, &connection.sender, global_state);
global_state.maybe_collect_garbage();
}
Event::Vfs(task) => match task {
@ -296,35 +272,29 @@ fn loop_turn(
let vfs = &mut global_state.vfs.write().0;
for (path, contents) in files {
let path = VfsPath::from(path);
if !loop_state.mem_docs.contains(&path) {
if !global_state.mem_docs.contains(&path) {
vfs.set_file_contents(path, contents)
}
}
}
vfs::loader::Message::Progress { n_total, n_done } => {
if n_done == n_total {
loop_state.status = Status::Ready;
global_state.status = Status::Ready;
became_ready = true;
}
report_progress(loop_state, &connection.sender, n_done, n_total, "roots scanned")
report_progress(global_state, &connection.sender, n_done, n_total, "roots scanned")
}
},
Event::CheckWatcher(task) => on_check_task(task, global_state, task_sender)?,
Event::Msg(msg) => match msg {
Message::Request(req) => on_request(
global_state,
&mut loop_state.req_queue.incoming,
pool,
task_sender,
&connection.sender,
loop_start,
req,
)?,
Message::Request(req) => {
on_request(global_state, pool, task_sender, &connection.sender, loop_start, req)?
}
Message::Notification(not) => {
on_notification(&connection.sender, global_state, loop_state, not)?;
on_notification(&connection.sender, global_state, not)?;
}
Message::Response(resp) => {
let handler = loop_state.req_queue.outgoing.complete(resp.id.clone());
let handler = global_state.req_queue.outgoing.complete(resp.id.clone());
handler(global_state, resp)
}
},
@ -338,8 +308,8 @@ fn loop_turn(
}
}
if loop_state.status == Status::Ready && (state_changed || became_ready) {
let subscriptions = loop_state
if global_state.status == Status::Ready && (state_changed || became_ready) {
let subscriptions = global_state
.mem_docs
.iter()
.map(|path| global_state.vfs.read().0.file_id(&path).unwrap())
@ -373,18 +343,15 @@ fn loop_turn(
Ok(())
}
fn on_task(
task: Task,
msg_sender: &Sender<Message>,
incoming_requests: &mut Incoming,
state: &mut GlobalState,
) {
fn on_task(task: Task, msg_sender: &Sender<Message>, global_state: &mut GlobalState) {
match task {
Task::Respond(response) => {
if let Some((method, start)) = incoming_requests.complete(response.id.clone()) {
if let Some((method, start)) =
global_state.req_queue.incoming.complete(response.id.clone())
{
let duration = start.elapsed();
log::info!("handled req#{} in {:?}", response.id, duration);
state.complete_request(RequestMetrics {
global_state.complete_request(RequestMetrics {
id: response.id.clone(),
method: method.to_string(),
duration,
@ -395,13 +362,12 @@ fn on_task(
Task::Notify(n) => {
msg_sender.send(n.into()).unwrap();
}
Task::Diagnostic(task) => on_diagnostic_task(task, msg_sender, state),
Task::Diagnostic(task) => on_diagnostic_task(task, msg_sender, global_state),
}
}
fn on_request(
global_state: &mut GlobalState,
incoming_requests: &mut Incoming,
pool: &ThreadPool,
task_sender: &Sender<Task>,
msg_sender: &Sender<Message>,
@ -414,7 +380,6 @@ fn on_request(
global_state,
task_sender,
msg_sender,
incoming_requests,
request_received,
};
pool_dispatcher
@ -469,7 +434,6 @@ fn on_request(
fn on_notification(
msg_sender: &Sender<Message>,
global_state: &mut GlobalState,
loop_state: &mut LoopState,
not: Notification,
) -> Result<()> {
let not = match notification_cast::<lsp_types::notification::Cancel>(not) {
@ -478,7 +442,7 @@ fn on_notification(
NumberOrString::Number(id) => id.into(),
NumberOrString::String(id) => id.into(),
};
if let Some(response) = loop_state.req_queue.incoming.cancel(id) {
if let Some(response) = global_state.req_queue.incoming.cancel(id) {
msg_sender.send(response.into()).unwrap()
}
return Ok(());
@ -488,7 +452,7 @@ fn on_notification(
let not = match notification_cast::<lsp_types::notification::DidOpenTextDocument>(not) {
Ok(params) => {
if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
if !loop_state.mem_docs.insert(path.clone()) {
if !global_state.mem_docs.insert(path.clone()) {
log::error!("duplicate DidOpenTextDocument: {}", path)
}
global_state
@ -504,7 +468,7 @@ fn on_notification(
let not = match notification_cast::<lsp_types::notification::DidChangeTextDocument>(not) {
Ok(params) => {
if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
assert!(loop_state.mem_docs.contains(&path));
assert!(global_state.mem_docs.contains(&path));
let vfs = &mut global_state.vfs.write().0;
let file_id = vfs.file_id(&path).unwrap();
let mut text = String::from_utf8(vfs.file_contents(file_id).to_vec()).unwrap();
@ -518,7 +482,7 @@ fn on_notification(
let not = match notification_cast::<lsp_types::notification::DidCloseTextDocument>(not) {
Ok(params) => {
if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
if !loop_state.mem_docs.remove(&path) {
if !global_state.mem_docs.remove(&path) {
log::error!("orphan DidCloseTextDocument: {}", path)
}
if let Some(path) = path.as_path() {
@ -549,7 +513,7 @@ fn on_notification(
Ok(_) => {
// As stated in https://github.com/microsoft/language-server-protocol/issues/676,
// this notification's parameters should be ignored and the actual config queried separately.
let request = loop_state.req_queue.outgoing.register(
let request = global_state.req_queue.outgoing.register(
lsp_types::request::WorkspaceConfiguration::METHOD.to_string(),
lsp_types::ConfigurationParams {
items: vec![lsp_types::ConfigurationItem {
@ -732,7 +696,7 @@ fn on_diagnostic_task(task: DiagnosticTask, msg_sender: &Sender<Message>, state:
}
fn report_progress(
loop_state: &mut LoopState,
global_state: &mut GlobalState,
sender: &Sender<Message>,
done: usize,
total: usize,
@ -742,7 +706,7 @@ fn report_progress(
let message = Some(format!("{}/{} {}", done, total, message));
let percentage = Some(100.0 * done as f64 / total.max(1) as f64);
let work_done_progress = if done == 0 {
let work_done_progress_create = loop_state.req_queue.outgoing.register(
let work_done_progress_create = global_state.req_queue.outgoing.register(
lsp_types::request::WorkDoneProgressCreate::METHOD.to_string(),
lsp_types::WorkDoneProgressCreateParams { token: token.clone() },
DO_NOTHING,
@ -777,7 +741,6 @@ struct PoolDispatcher<'a> {
req: Option<Request>,
pool: &'a ThreadPool,
global_state: &'a mut GlobalState,
incoming_requests: &'a mut Incoming,
msg_sender: &'a Sender<Message>,
task_sender: &'a Sender<Task>,
request_received: Instant,
@ -806,7 +769,7 @@ impl<'a> PoolDispatcher<'a> {
result_to_task::<R>(id, result)
})
.map_err(|_| format!("sync task {:?} panicked", R::METHOD))?;
on_task(task, self.msg_sender, self.incoming_requests, self.global_state);
on_task(task, self.msg_sender, self.global_state);
Ok(self)
}
@ -853,7 +816,10 @@ impl<'a> PoolDispatcher<'a> {
return None;
}
};
self.incoming_requests.register(id.clone(), (R::METHOD, self.request_received));
self.global_state
.req_queue
.incoming
.register(id.clone(), (R::METHOD, self.request_received));
Some((id, params))
}