diff --git a/Cargo.lock b/Cargo.lock index d5a67c40930..6cbb1324e2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -597,6 +597,7 @@ version = "0.0.0" dependencies = [ "cfg", "cov-mark", + "crossbeam-channel", "dot", "either", "expect-test", @@ -1367,6 +1368,7 @@ dependencies = [ "lsp-types", "mbe", "mimalloc", + "num_cpus", "oorandom", "parking_lot", "proc_macro_api", diff --git a/crates/ide/Cargo.toml b/crates/ide/Cargo.toml index ae1109a63db..250673d3fe7 100644 --- a/crates/ide/Cargo.toml +++ b/crates/ide/Cargo.toml @@ -11,6 +11,7 @@ doctest = false [dependencies] cov-mark = "2.0.0-pre.1" +crossbeam-channel = "0.5.0" either = "1.5.3" itertools = "0.10.0" tracing = "0.1" diff --git a/crates/ide/src/lib.rs b/crates/ide/src/lib.rs index db7b80f71bf..4028b0bc725 100644 --- a/crates/ide/src/lib.rs +++ b/crates/ide/src/lib.rs @@ -87,7 +87,7 @@ macro_rules! eprintln { moniker::{MonikerKind, MonikerResult, PackageInformation}, move_item::Direction, navigation_target::NavigationTarget, - prime_caches::PrimeCachesProgress, + prime_caches::ParallelPrimeCachesProgress, references::ReferenceSearchResult, rename::RenameError, runnables::{Runnable, RunnableKind, TestId}, @@ -244,11 +244,11 @@ pub fn status(&self, file_id: Option) -> Cancellable { self.with_db(|db| status::status(&*db, file_id)) } - pub fn prime_caches(&self, cb: F) -> Cancellable<()> + pub fn parallel_prime_caches(&self, num_worker_threads: u8, cb: F) -> Cancellable<()> where - F: Fn(PrimeCachesProgress) + Sync + std::panic::UnwindSafe, + F: Fn(ParallelPrimeCachesProgress) + Sync + std::panic::UnwindSafe, { - self.with_db(move |db| prime_caches::prime_caches(db, &cb)) + self.with_db(move |db| prime_caches::parallel_prime_caches(db, num_worker_threads, &cb)) } /// Gets the text of the source file. diff --git a/crates/ide/src/prime_caches.rs b/crates/ide/src/prime_caches.rs index 5eba1d1e276..892b34c7d90 100644 --- a/crates/ide/src/prime_caches.rs +++ b/crates/ide/src/prime_caches.rs @@ -2,29 +2,152 @@ //! sometimes is counter productive when, for example, the first goto definition //! request takes longer to compute. This modules implemented prepopulation of //! various caches, it's not really advanced at the moment. +mod topologic_sort; + +use std::time::Duration; use hir::db::DefDatabase; -use ide_db::base_db::{SourceDatabase, SourceDatabaseExt}; +use ide_db::{ + base_db::{ + salsa::{Database, ParallelDatabase, Snapshot}, + Cancelled, CrateGraph, CrateId, SourceDatabase, SourceDatabaseExt, + }, + FxIndexMap, +}; use rustc_hash::FxHashSet; use crate::RootDatabase; -/// We started indexing a crate. +/// We're indexing many crates. #[derive(Debug)] -pub struct PrimeCachesProgress { - pub on_crate: String, - pub n_done: usize, - pub n_total: usize, +pub struct ParallelPrimeCachesProgress { + /// the crates that we are currently priming. + pub crates_currently_indexing: Vec, + /// the total number of crates we want to prime. + pub crates_total: usize, + /// the total number of crates that have finished priming + pub crates_done: usize, } -pub(crate) fn prime_caches(db: &RootDatabase, cb: &(dyn Fn(PrimeCachesProgress) + Sync)) { +pub(crate) fn parallel_prime_caches( + db: &RootDatabase, + num_worker_threads: u8, + cb: &(dyn Fn(ParallelPrimeCachesProgress) + Sync), +) { let _p = profile::span("prime_caches"); + let graph = db.crate_graph(); + let mut crates_to_prime = { + let crate_ids = compute_crates_to_prime(db, &graph); + + let mut builder = topologic_sort::TopologicalSortIter::builder(); + + for &crate_id in &crate_ids { + let crate_data = &graph[crate_id]; + let dependencies = crate_data + .dependencies + .iter() + .map(|d| d.crate_id) + .filter(|i| crate_ids.contains(i)); + + builder.add(crate_id, dependencies); + } + + builder.build() + }; + + enum ParallelPrimeCacheWorkerProgress { + BeginCrate { crate_id: CrateId, crate_name: String }, + EndCrate { crate_id: CrateId }, + } + + let (work_sender, progress_receiver) = { + let (progress_sender, progress_receiver) = crossbeam_channel::unbounded(); + let (work_sender, work_receiver) = crossbeam_channel::unbounded(); + let prime_caches_worker = move |db: Snapshot| { + while let Ok((crate_id, crate_name)) = work_receiver.recv() { + progress_sender + .send(ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name })?; + + // This also computes the DefMap + db.import_map(crate_id); + + progress_sender.send(ParallelPrimeCacheWorkerProgress::EndCrate { crate_id })?; + } + + Ok::<_, crossbeam_channel::SendError<_>>(()) + }; + + for _ in 0..num_worker_threads { + let worker = prime_caches_worker.clone(); + let db = db.snapshot(); + std::thread::spawn(move || Cancelled::catch(|| worker(db))); + } + + (work_sender, progress_receiver) + }; + + let crates_total = crates_to_prime.pending(); + let mut crates_done = 0; + + // an index map is used to preserve ordering so we can sort the progress report in order of + // "longest crate to index" first + let mut crates_currently_indexing = + FxIndexMap::with_capacity_and_hasher(num_worker_threads as _, Default::default()); + + while crates_done < crates_total { + db.unwind_if_cancelled(); + + for crate_id in &mut crates_to_prime { + work_sender + .send(( + crate_id, + graph[crate_id].display_name.as_deref().unwrap_or_default().to_string(), + )) + .ok(); + } + + // recv_timeout is somewhat a hack, we need a way to from this thread check to see if the current salsa revision + // is cancelled on a regular basis. workers will only exit if they are processing a task that is cancelled, or + // if this thread exits, and closes the work channel. + let worker_progress = match progress_receiver.recv_timeout(Duration::from_millis(10)) { + Ok(p) => p, + Err(crossbeam_channel::RecvTimeoutError::Timeout) => { + continue; + } + Err(crossbeam_channel::RecvTimeoutError::Disconnected) => { + // our workers may have died from a cancelled task, so we'll check and re-raise here. + db.unwind_if_cancelled(); + break; + } + }; + match worker_progress { + ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => { + crates_currently_indexing.insert(crate_id, crate_name); + } + ParallelPrimeCacheWorkerProgress::EndCrate { crate_id } => { + crates_currently_indexing.remove(&crate_id); + crates_to_prime.mark_done(crate_id); + crates_done += 1; + } + }; + + let progress = ParallelPrimeCachesProgress { + crates_currently_indexing: crates_currently_indexing.values().cloned().collect(), + crates_done, + crates_total, + }; + + cb(progress); + } +} + +fn compute_crates_to_prime(db: &RootDatabase, graph: &CrateGraph) -> FxHashSet { // We're only interested in the workspace crates and the `ImportMap`s of their direct // dependencies, though in practice the latter also compute the `DefMap`s. // We don't prime transitive dependencies because they're generally not visible in // the current workspace. - let to_prime: FxHashSet<_> = graph + graph .iter() .filter(|&id| { let file_id = graph[id].root_file_id; @@ -32,17 +155,5 @@ pub(crate) fn prime_caches(db: &RootDatabase, cb: &(dyn Fn(PrimeCachesProgress) !db.source_root(root_id).is_library }) .flat_map(|id| graph[id].dependencies.iter().map(|krate| krate.crate_id)) - .collect(); - - // FIXME: This would be easy to parallelize, since it's in the ideal ordering for that. - // Unfortunately rayon prevents panics from propagation out of a `scope`, which breaks - // cancellation, so we cannot use rayon. - let n_total = to_prime.len(); - for (n_done, &crate_id) in to_prime.iter().enumerate() { - let crate_name = graph[crate_id].display_name.as_deref().unwrap_or_default().to_string(); - - cb(PrimeCachesProgress { on_crate: crate_name, n_done, n_total }); - // This also computes the DefMap - db.import_map(crate_id); - } + .collect() } diff --git a/crates/ide/src/prime_caches/topologic_sort.rs b/crates/ide/src/prime_caches/topologic_sort.rs new file mode 100644 index 00000000000..b04087fa7bd --- /dev/null +++ b/crates/ide/src/prime_caches/topologic_sort.rs @@ -0,0 +1,98 @@ +//! helper data structure to schedule work for parallel prime caches. +use std::{collections::VecDeque, hash::Hash}; + +use rustc_hash::FxHashMap; + +pub(crate) struct TopologicSortIterBuilder { + nodes: FxHashMap>, +} + +impl TopologicSortIterBuilder +where + T: Copy + Eq + PartialEq + Hash, +{ + fn new() -> Self { + Self { nodes: Default::default() } + } + + fn get_or_create_entry(&mut self, item: T) -> &mut Entry { + self.nodes.entry(item).or_default() + } + + pub(crate) fn add(&mut self, item: T, predecessors: impl IntoIterator) { + let mut num_predecessors = 0; + + for predecessor in predecessors.into_iter() { + self.get_or_create_entry(predecessor).successors.push(item); + num_predecessors += 1; + } + + let entry = self.get_or_create_entry(item); + entry.num_predecessors += num_predecessors; + } + + pub(crate) fn build(self) -> TopologicalSortIter { + let ready = self + .nodes + .iter() + .filter_map( + |(item, entry)| if entry.num_predecessors == 0 { Some(*item) } else { None }, + ) + .collect(); + + TopologicalSortIter { nodes: self.nodes, ready } + } +} + +pub(crate) struct TopologicalSortIter { + ready: VecDeque, + nodes: FxHashMap>, +} + +impl TopologicalSortIter +where + T: Copy + Eq + PartialEq + Hash, +{ + pub(crate) fn builder() -> TopologicSortIterBuilder { + TopologicSortIterBuilder::new() + } + + pub(crate) fn pending(&self) -> usize { + self.nodes.len() + } + + pub(crate) fn mark_done(&mut self, item: T) { + let entry = self.nodes.remove(&item).expect("invariant: unknown item marked as done"); + + for successor in entry.successors { + let succ_entry = self + .nodes + .get_mut(&successor) + .expect("invariant: unknown successor referenced by entry"); + + succ_entry.num_predecessors -= 1; + if succ_entry.num_predecessors == 0 { + self.ready.push_back(successor); + } + } + } +} + +impl Iterator for TopologicalSortIter { + type Item = T; + + fn next(&mut self) -> Option { + self.ready.pop_front() + } +} + +struct Entry { + successors: Vec, + num_predecessors: usize, +} + +impl Default for Entry { + fn default() -> Self { + Self { successors: Default::default(), num_predecessors: 0 } + } +} diff --git a/crates/rust-analyzer/Cargo.toml b/crates/rust-analyzer/Cargo.toml index 0ab866a707e..014001397de 100644 --- a/crates/rust-analyzer/Cargo.toml +++ b/crates/rust-analyzer/Cargo.toml @@ -31,6 +31,7 @@ serde = { version = "1.0.106", features = ["derive"] } serde_json = { version = "1.0.48", features = ["preserve_order"] } threadpool = "1.7.1" rayon = "1.5" +num_cpus = "1.13.1" mimalloc = { version = "0.1.19", default-features = false, optional = true } lsp-server = "0.5.1" tracing = "0.1" diff --git a/crates/rust-analyzer/src/cli/load_cargo.rs b/crates/rust-analyzer/src/cli/load_cargo.rs index 19ce86e3ffa..490aef50f3e 100644 --- a/crates/rust-analyzer/src/cli/load_cargo.rs +++ b/crates/rust-analyzer/src/cli/load_cargo.rs @@ -88,7 +88,7 @@ pub fn load_workspace( load_crate_graph(crate_graph, project_folders.source_root_config, &mut vfs, &receiver); if load_config.prefill_caches { - host.analysis().prime_caches(|_| {})?; + host.analysis().parallel_prime_caches(1, |_| {})?; } Ok((host, vfs, proc_macro_client)) } diff --git a/crates/rust-analyzer/src/config.rs b/crates/rust-analyzer/src/config.rs index 1df19ffe780..76b72707974 100644 --- a/crates/rust-analyzer/src/config.rs +++ b/crates/rust-analyzer/src/config.rs @@ -298,6 +298,9 @@ struct ConfigData { /// Whether to show `can't find Cargo.toml` error message. notifications_cargoTomlNotFound: bool = "true", + /// How many worker threads to to handle priming caches. The default `0` means to pick automatically. + primeCaches_numThreads: ParallelPrimeCachesNumThreads = "0", + /// Enable support for procedural macros, implies `#rust-analyzer.cargo.runBuildScripts#`. procMacro_enable: bool = "true", /// Internal config, path to proc-macro server executable (typically, @@ -1016,6 +1019,13 @@ pub fn highlight_related(&self) -> HighlightRelatedConfig { yield_points: self.data.highlightRelated_yieldPoints, } } + + pub fn prime_caches_num_threads(&self) -> u8 { + match self.data.primeCaches_numThreads { + 0 => num_cpus::get_physical().try_into().unwrap_or(u8::MAX), + n => n, + } + } } #[derive(Deserialize, Debug, Clone, Copy)] @@ -1130,6 +1140,8 @@ enum WorkspaceSymbolSearchKindDef { AllSymbols, } +type ParallelPrimeCachesNumThreads = u8; + macro_rules! _config_data { (struct $name:ident { $( @@ -1351,6 +1363,11 @@ macro_rules! set { "Search for all symbols kinds" ], }, + "ParallelPrimeCachesNumThreads" => set! { + "type": "number", + "minimum": 0, + "maximum": 255 + }, _ => panic!("{}: {}", ty, default), } diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs index 830b77f3918..5bec301736a 100644 --- a/crates/rust-analyzer/src/main_loop.rs +++ b/crates/rust-analyzer/src/main_loop.rs @@ -70,7 +70,7 @@ pub(crate) enum Task { #[derive(Debug)] pub(crate) enum PrimeCachesProgress { Begin, - Report(ide::PrimeCachesProgress), + Report(ide::ParallelPrimeCachesProgress), End { cancelled: bool }, } @@ -291,11 +291,23 @@ fn handle_event(&mut self, event: Event) -> Result<()> { } PrimeCachesProgress::Report(report) => { state = Progress::Report; - message = Some(format!( - "{}/{} ({})", - report.n_done, report.n_total, report.on_crate - )); - fraction = Progress::fraction(report.n_done, report.n_total); + + message = match &report.crates_currently_indexing[..] { + [crate_name] => Some(format!( + "{}/{} ({})", + report.crates_done, report.crates_total, crate_name + )), + [crate_name, rest @ ..] => Some(format!( + "{}/{} ({} + {} more)", + report.crates_done, + report.crates_total, + crate_name, + rest.len() + )), + _ => None, + }; + + fraction = Progress::fraction(report.crates_done, report.crates_total); } PrimeCachesProgress::End { cancelled } => { state = Progress::End; @@ -493,11 +505,13 @@ fn handle_event(&mut self, event: Event) -> Result<()> { self.fetch_build_data(); } if self.prime_caches_queue.should_start_op() { + let num_worker_threads = self.config.prime_caches_num_threads(); + self.task_pool.handle.spawn_with_sender({ let analysis = self.snapshot().analysis; move |sender| { sender.send(Task::PrimeCaches(PrimeCachesProgress::Begin)).unwrap(); - let res = analysis.prime_caches(|progress| { + let res = analysis.parallel_prime_caches(num_worker_threads, |progress| { let report = PrimeCachesProgress::Report(progress); sender.send(Task::PrimeCaches(report)).unwrap(); }); diff --git a/docs/user/generated_config.adoc b/docs/user/generated_config.adoc index f7a533c7c23..b10b0d35522 100644 --- a/docs/user/generated_config.adoc +++ b/docs/user/generated_config.adoc @@ -454,6 +454,11 @@ Number of syntax trees rust-analyzer keeps in memory. Defaults to 128. -- Whether to show `can't find Cargo.toml` error message. -- +[[rust-analyzer.primeCaches.numThreads]]rust-analyzer.primeCaches.numThreads (default: `0`):: ++ +-- +How many worker threads to to handle priming caches. The default `0` means to pick automatically. +-- [[rust-analyzer.procMacro.enable]]rust-analyzer.procMacro.enable (default: `true`):: + -- diff --git a/editors/code/package.json b/editors/code/package.json index ed81cb52403..8f4157da0d1 100644 --- a/editors/code/package.json +++ b/editors/code/package.json @@ -880,6 +880,13 @@ "default": true, "type": "boolean" }, + "rust-analyzer.primeCaches.numThreads": { + "markdownDescription": "How many worker threads to to handle priming caches. The default `0` means to pick automatically.", + "default": 0, + "type": "number", + "minimum": 0, + "maximum": 255 + }, "rust-analyzer.procMacro.enable": { "markdownDescription": "Enable support for procedural macros, implies `#rust-analyzer.cargo.runBuildScripts#`.", "default": true,