move completed requests to a separate file
This commit is contained in:
parent
838915c9a2
commit
e1bda6aeda
@ -1,5 +1,6 @@
|
|||||||
mod handlers;
|
mod handlers;
|
||||||
mod subscriptions;
|
mod subscriptions;
|
||||||
|
pub(crate) mod pending_requests;
|
||||||
|
|
||||||
use std::{fmt, path::PathBuf, sync::Arc, time::Instant, any::TypeId};
|
use std::{fmt, path::PathBuf, sync::Arc, time::Instant, any::TypeId};
|
||||||
|
|
||||||
@ -12,16 +13,18 @@ use gen_lsp_server::{
|
|||||||
use lsp_types::NumberOrString;
|
use lsp_types::NumberOrString;
|
||||||
use ra_ide_api::{Canceled, FileId, LibraryData};
|
use ra_ide_api::{Canceled, FileId, LibraryData};
|
||||||
use ra_vfs::VfsTask;
|
use ra_vfs::VfsTask;
|
||||||
use rustc_hash::FxHashMap;
|
|
||||||
use serde::{de::DeserializeOwned, Serialize};
|
use serde::{de::DeserializeOwned, Serialize};
|
||||||
use threadpool::ThreadPool;
|
use threadpool::ThreadPool;
|
||||||
use ra_prof::profile;
|
use ra_prof::profile;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
main_loop::subscriptions::Subscriptions,
|
main_loop::{
|
||||||
|
subscriptions::Subscriptions,
|
||||||
|
pending_requests::{PendingRequests, PendingRequest},
|
||||||
|
},
|
||||||
project_model::workspace_loader,
|
project_model::workspace_loader,
|
||||||
req,
|
req,
|
||||||
server_world::{ServerWorld, ServerWorldState, CompletedRequest},
|
server_world::{ServerWorld, ServerWorldState},
|
||||||
Result,
|
Result,
|
||||||
InitializationOptions,
|
InitializationOptions,
|
||||||
};
|
};
|
||||||
@ -42,37 +45,12 @@ impl LspError {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
enum Task {
|
|
||||||
Respond(RawResponse),
|
|
||||||
Notify(RawNotification),
|
|
||||||
}
|
|
||||||
|
|
||||||
struct PendingRequest {
|
|
||||||
id: u64,
|
|
||||||
received: Instant,
|
|
||||||
method: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<PendingRequest> for CompletedRequest {
|
|
||||||
fn from(pending: PendingRequest) -> CompletedRequest {
|
|
||||||
CompletedRequest {
|
|
||||||
id: pending.id,
|
|
||||||
method: pending.method,
|
|
||||||
duration: pending.received.elapsed(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn main_loop(
|
pub fn main_loop(
|
||||||
ws_roots: Vec<PathBuf>,
|
ws_roots: Vec<PathBuf>,
|
||||||
options: InitializationOptions,
|
options: InitializationOptions,
|
||||||
msg_receiver: &Receiver<RawMessage>,
|
msg_receiver: &Receiver<RawMessage>,
|
||||||
msg_sender: &Sender<RawMessage>,
|
msg_sender: &Sender<RawMessage>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let pool = ThreadPool::new(THREADPOOL_SIZE);
|
|
||||||
let (task_sender, task_receiver) = unbounded::<Task>();
|
|
||||||
|
|
||||||
// FIXME: support dynamic workspace loading.
|
// FIXME: support dynamic workspace loading.
|
||||||
let workspaces = {
|
let workspaces = {
|
||||||
let ws_worker = workspace_loader();
|
let ws_worker = workspace_loader();
|
||||||
@ -97,10 +75,12 @@ pub fn main_loop(
|
|||||||
|
|
||||||
let mut state = ServerWorldState::new(ws_roots, workspaces);
|
let mut state = ServerWorldState::new(ws_roots, workspaces);
|
||||||
|
|
||||||
log::info!("server initialized, serving requests");
|
let pool = ThreadPool::new(THREADPOOL_SIZE);
|
||||||
|
let (task_sender, task_receiver) = unbounded::<Task>();
|
||||||
|
let mut pending_requests = PendingRequests::default();
|
||||||
|
let mut subs = Subscriptions::default();
|
||||||
|
|
||||||
let mut pending_requests = FxHashMap::default();
|
log::info!("server initialized, serving requests");
|
||||||
let mut subs = Subscriptions::new();
|
|
||||||
let main_res = main_loop_inner(
|
let main_res = main_loop_inner(
|
||||||
options,
|
options,
|
||||||
&pool,
|
&pool,
|
||||||
@ -128,6 +108,12 @@ pub fn main_loop(
|
|||||||
main_res
|
main_res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum Task {
|
||||||
|
Respond(RawResponse),
|
||||||
|
Notify(RawNotification),
|
||||||
|
}
|
||||||
|
|
||||||
enum Event {
|
enum Event {
|
||||||
Msg(RawMessage),
|
Msg(RawMessage),
|
||||||
Task(Task),
|
Task(Task),
|
||||||
@ -178,7 +164,7 @@ fn main_loop_inner(
|
|||||||
task_sender: Sender<Task>,
|
task_sender: Sender<Task>,
|
||||||
task_receiver: Receiver<Task>,
|
task_receiver: Receiver<Task>,
|
||||||
state: &mut ServerWorldState,
|
state: &mut ServerWorldState,
|
||||||
pending_requests: &mut FxHashMap<u64, PendingRequest>,
|
pending_requests: &mut PendingRequests,
|
||||||
subs: &mut Subscriptions,
|
subs: &mut Subscriptions,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
// We try not to index more than MAX_IN_FLIGHT_LIBS libraries at the same
|
// We try not to index more than MAX_IN_FLIGHT_LIBS libraries at the same
|
||||||
@ -202,15 +188,16 @@ fn main_loop_inner(
|
|||||||
},
|
},
|
||||||
recv(libdata_receiver) -> data => Event::Lib(data.unwrap())
|
recv(libdata_receiver) -> data => Event::Lib(data.unwrap())
|
||||||
};
|
};
|
||||||
// NOTE: don't count blocking select! call as a loop-turn time
|
|
||||||
let _p = profile("main_loop_inner/loop-turn");
|
|
||||||
let loop_start = Instant::now();
|
let loop_start = Instant::now();
|
||||||
|
|
||||||
|
// NOTE: don't count blocking select! call as a loop-turn time
|
||||||
|
let _p = profile("main_loop_inner/loop-turn");
|
||||||
log::info!("loop turn = {:?}", event);
|
log::info!("loop turn = {:?}", event);
|
||||||
let queue_count = pool.queued_count();
|
let queue_count = pool.queued_count();
|
||||||
if queue_count > 0 {
|
if queue_count > 0 {
|
||||||
log::info!("queued count = {}", queue_count);
|
log::info!("queued count = {}", queue_count);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut state_changed = false;
|
let mut state_changed = false;
|
||||||
match event {
|
match event {
|
||||||
Event::Task(task) => {
|
Event::Task(task) => {
|
||||||
@ -311,13 +298,12 @@ fn main_loop_inner(
|
|||||||
fn on_task(
|
fn on_task(
|
||||||
task: Task,
|
task: Task,
|
||||||
msg_sender: &Sender<RawMessage>,
|
msg_sender: &Sender<RawMessage>,
|
||||||
pending_requests: &mut FxHashMap<u64, PendingRequest>,
|
pending_requests: &mut PendingRequests,
|
||||||
state: &mut ServerWorldState,
|
state: &mut ServerWorldState,
|
||||||
) {
|
) {
|
||||||
match task {
|
match task {
|
||||||
Task::Respond(response) => {
|
Task::Respond(response) => {
|
||||||
if let Some(pending) = pending_requests.remove(&response.id) {
|
if let Some(completed) = pending_requests.finish(response.id) {
|
||||||
let completed = CompletedRequest::from(pending);
|
|
||||||
log::info!("handled req#{} in {:?}", completed.id, completed.duration);
|
log::info!("handled req#{} in {:?}", completed.id, completed.duration);
|
||||||
state.complete_request(completed);
|
state.complete_request(completed);
|
||||||
msg_sender.send(response.into()).unwrap();
|
msg_sender.send(response.into()).unwrap();
|
||||||
@ -331,7 +317,7 @@ fn on_task(
|
|||||||
|
|
||||||
fn on_request(
|
fn on_request(
|
||||||
world: &mut ServerWorldState,
|
world: &mut ServerWorldState,
|
||||||
pending_requests: &mut FxHashMap<u64, PendingRequest>,
|
pending_requests: &mut PendingRequests,
|
||||||
pool: &ThreadPool,
|
pool: &ThreadPool,
|
||||||
sender: &Sender<Task>,
|
sender: &Sender<Task>,
|
||||||
request_received: Instant,
|
request_received: Instant,
|
||||||
@ -371,9 +357,7 @@ fn on_request(
|
|||||||
.finish();
|
.finish();
|
||||||
match req {
|
match req {
|
||||||
Ok(id) => {
|
Ok(id) => {
|
||||||
let prev = pending_requests
|
pending_requests.start(PendingRequest { id, method, received: request_received });
|
||||||
.insert(id, PendingRequest { id, method, received: request_received });
|
|
||||||
assert!(prev.is_none(), "duplicate request: {}", id);
|
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
Err(req) => Ok(Some(req)),
|
Err(req) => Ok(Some(req)),
|
||||||
@ -383,7 +367,7 @@ fn on_request(
|
|||||||
fn on_notification(
|
fn on_notification(
|
||||||
msg_sender: &Sender<RawMessage>,
|
msg_sender: &Sender<RawMessage>,
|
||||||
state: &mut ServerWorldState,
|
state: &mut ServerWorldState,
|
||||||
pending_requests: &mut FxHashMap<u64, PendingRequest>,
|
pending_requests: &mut PendingRequests,
|
||||||
subs: &mut Subscriptions,
|
subs: &mut Subscriptions,
|
||||||
not: RawNotification,
|
not: RawNotification,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
@ -395,7 +379,7 @@ fn on_notification(
|
|||||||
panic!("string id's not supported: {:?}", id);
|
panic!("string id's not supported: {:?}", id);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if pending_requests.remove(&id).is_some() {
|
if pending_requests.cancel(id) {
|
||||||
let response = RawResponse::err(
|
let response = RawResponse::err(
|
||||||
id,
|
id,
|
||||||
ErrorCode::RequestCanceled as i32,
|
ErrorCode::RequestCanceled as i32,
|
||||||
|
@ -31,10 +31,10 @@ use crate::{
|
|||||||
pub fn handle_analyzer_status(world: ServerWorld, _: ()) -> Result<String> {
|
pub fn handle_analyzer_status(world: ServerWorld, _: ()) -> Result<String> {
|
||||||
let mut buf = world.status();
|
let mut buf = world.status();
|
||||||
writeln!(buf, "\n\nrequests:").unwrap();
|
writeln!(buf, "\n\nrequests:").unwrap();
|
||||||
let requests = world.latest_completed_requests.read();
|
let requests = world.latest_requests.read();
|
||||||
for (idx, r) in requests.iter().enumerate() {
|
for (is_last, r) in requests.iter() {
|
||||||
let current = if idx == world.request_idx { "*" } else { " " };
|
let mark = if is_last { "*" } else { " " };
|
||||||
writeln!(buf, "{:4}{}{:<36}{}ms", r.id, current, r.method, r.duration.as_millis()).unwrap();
|
writeln!(buf, "{}{:4} {:<36}{}ms", mark, r.id, r.method, r.duration.as_millis()).unwrap();
|
||||||
}
|
}
|
||||||
Ok(buf)
|
Ok(buf)
|
||||||
}
|
}
|
||||||
|
72
crates/ra_lsp_server/src/main_loop/pending_requests.rs
Normal file
72
crates/ra_lsp_server/src/main_loop/pending_requests.rs
Normal file
@ -0,0 +1,72 @@
|
|||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use rustc_hash::FxHashMap;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct CompletedRequest {
|
||||||
|
pub id: u64,
|
||||||
|
pub method: String,
|
||||||
|
pub duration: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(crate) struct PendingRequest {
|
||||||
|
pub(crate) id: u64,
|
||||||
|
pub(crate) method: String,
|
||||||
|
pub(crate) received: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<PendingRequest> for CompletedRequest {
|
||||||
|
fn from(pending: PendingRequest) -> CompletedRequest {
|
||||||
|
CompletedRequest {
|
||||||
|
id: pending.id,
|
||||||
|
method: pending.method,
|
||||||
|
duration: pending.received.elapsed(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub(crate) struct PendingRequests {
|
||||||
|
map: FxHashMap<u64, PendingRequest>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PendingRequests {
|
||||||
|
pub(crate) fn start(&mut self, request: PendingRequest) {
|
||||||
|
let id = request.id;
|
||||||
|
let prev = self.map.insert(id, request);
|
||||||
|
assert!(prev.is_none(), "duplicate request with id {}", id);
|
||||||
|
}
|
||||||
|
pub(crate) fn cancel(&mut self, id: u64) -> bool {
|
||||||
|
self.map.remove(&id).is_some()
|
||||||
|
}
|
||||||
|
pub(crate) fn finish(&mut self, id: u64) -> Option<CompletedRequest> {
|
||||||
|
self.map.remove(&id).map(CompletedRequest::from)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const N_COMPLETED_REQUESTS: usize = 10;
|
||||||
|
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct LatestRequests {
|
||||||
|
// hand-rolling VecDeque here to print things in a nicer way
|
||||||
|
buf: [Option<CompletedRequest>; N_COMPLETED_REQUESTS],
|
||||||
|
idx: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LatestRequests {
|
||||||
|
pub(crate) fn record(&mut self, request: CompletedRequest) {
|
||||||
|
// special case: don't track status request itself
|
||||||
|
if request.method == "rust-analyzer/analyzerStatus" {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let idx = self.idx;
|
||||||
|
self.buf[idx] = Some(request);
|
||||||
|
self.idx = (idx + 1) % N_COMPLETED_REQUESTS;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn iter(&self) -> impl Iterator<Item = (bool, &CompletedRequest)> {
|
||||||
|
let idx = self.idx;
|
||||||
|
self.buf.iter().enumerate().filter_map(move |(i, req)| Some((i == idx, req.as_ref()?)))
|
||||||
|
}
|
||||||
|
}
|
@ -1,21 +1,19 @@
|
|||||||
use ra_ide_api::FileId;
|
use ra_ide_api::FileId;
|
||||||
use rustc_hash::FxHashSet;
|
use rustc_hash::FxHashSet;
|
||||||
|
|
||||||
pub struct Subscriptions {
|
#[derive(Default)]
|
||||||
|
pub(crate) struct Subscriptions {
|
||||||
subs: FxHashSet<FileId>,
|
subs: FxHashSet<FileId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Subscriptions {
|
impl Subscriptions {
|
||||||
pub fn new() -> Subscriptions {
|
pub(crate) fn add_sub(&mut self, file_id: FileId) {
|
||||||
Subscriptions { subs: FxHashSet::default() }
|
|
||||||
}
|
|
||||||
pub fn add_sub(&mut self, file_id: FileId) {
|
|
||||||
self.subs.insert(file_id);
|
self.subs.insert(file_id);
|
||||||
}
|
}
|
||||||
pub fn remove_sub(&mut self, file_id: FileId) {
|
pub(crate) fn remove_sub(&mut self, file_id: FileId) {
|
||||||
self.subs.remove(&file_id);
|
self.subs.remove(&file_id);
|
||||||
}
|
}
|
||||||
pub fn subscriptions(&self) -> Vec<FileId> {
|
pub(crate) fn subscriptions(&self) -> Vec<FileId> {
|
||||||
self.subs.iter().cloned().collect()
|
self.subs.iter().cloned().collect()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
use std::{
|
use std::{
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::Duration,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
use lsp_types::Url;
|
use lsp_types::Url;
|
||||||
@ -16,6 +15,7 @@ use failure::{Error, format_err};
|
|||||||
use gen_lsp_server::ErrorCode;
|
use gen_lsp_server::ErrorCode;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
main_loop::pending_requests::{CompletedRequest, LatestRequests},
|
||||||
project_model::ProjectWorkspace,
|
project_model::ProjectWorkspace,
|
||||||
vfs_filter::IncludeRustFiles,
|
vfs_filter::IncludeRustFiles,
|
||||||
Result,
|
Result,
|
||||||
@ -29,26 +29,14 @@ pub struct ServerWorldState {
|
|||||||
pub workspaces: Arc<Vec<ProjectWorkspace>>,
|
pub workspaces: Arc<Vec<ProjectWorkspace>>,
|
||||||
pub analysis_host: AnalysisHost,
|
pub analysis_host: AnalysisHost,
|
||||||
pub vfs: Arc<RwLock<Vfs>>,
|
pub vfs: Arc<RwLock<Vfs>>,
|
||||||
// hand-rolling VecDeque here to print things in a nicer way
|
pub latest_requests: Arc<RwLock<LatestRequests>>,
|
||||||
pub latest_completed_requests: Arc<RwLock<[CompletedRequest; N_COMPLETED_REQUESTS]>>,
|
|
||||||
pub request_idx: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const N_COMPLETED_REQUESTS: usize = 10;
|
|
||||||
|
|
||||||
pub struct ServerWorld {
|
pub struct ServerWorld {
|
||||||
pub workspaces: Arc<Vec<ProjectWorkspace>>,
|
pub workspaces: Arc<Vec<ProjectWorkspace>>,
|
||||||
pub analysis: Analysis,
|
pub analysis: Analysis,
|
||||||
pub vfs: Arc<RwLock<Vfs>>,
|
pub vfs: Arc<RwLock<Vfs>>,
|
||||||
pub latest_completed_requests: Arc<RwLock<[CompletedRequest; N_COMPLETED_REQUESTS]>>,
|
pub latest_requests: Arc<RwLock<LatestRequests>>,
|
||||||
pub request_idx: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
|
||||||
pub struct CompletedRequest {
|
|
||||||
pub id: u64,
|
|
||||||
pub method: String,
|
|
||||||
pub duration: Duration,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServerWorldState {
|
impl ServerWorldState {
|
||||||
@ -88,8 +76,7 @@ impl ServerWorldState {
|
|||||||
workspaces: Arc::new(workspaces),
|
workspaces: Arc::new(workspaces),
|
||||||
analysis_host,
|
analysis_host,
|
||||||
vfs: Arc::new(RwLock::new(vfs)),
|
vfs: Arc::new(RwLock::new(vfs)),
|
||||||
latest_completed_requests: Default::default(),
|
latest_requests: Default::default(),
|
||||||
request_idx: 0,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -158,8 +145,7 @@ impl ServerWorldState {
|
|||||||
workspaces: Arc::clone(&self.workspaces),
|
workspaces: Arc::clone(&self.workspaces),
|
||||||
analysis: self.analysis_host.analysis(),
|
analysis: self.analysis_host.analysis(),
|
||||||
vfs: Arc::clone(&self.vfs),
|
vfs: Arc::clone(&self.vfs),
|
||||||
latest_completed_requests: Arc::clone(&self.latest_completed_requests),
|
latest_requests: Arc::clone(&self.latest_requests),
|
||||||
request_idx: self.request_idx.checked_sub(1).unwrap_or(N_COMPLETED_REQUESTS - 1),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -172,13 +158,7 @@ impl ServerWorldState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn complete_request(&mut self, request: CompletedRequest) {
|
pub fn complete_request(&mut self, request: CompletedRequest) {
|
||||||
// special case: don't track status request itself
|
self.latest_requests.write().record(request)
|
||||||
if request.method == "rust-analyzer/analyzerStatus" {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let idx = self.request_idx;
|
|
||||||
self.latest_completed_requests.write()[idx] = request;
|
|
||||||
self.request_idx = (idx + 1) % N_COMPLETED_REQUESTS;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user