From 3168148cc626c26658d49f7304ff7f07194baec2 Mon Sep 17 00:00:00 2001 From: Jake Heinz Date: Fri, 14 Jan 2022 09:11:47 +0000 Subject: [PATCH] ide: parallel prime caches --- Cargo.lock | 2 + crates/ide/Cargo.toml | 2 + crates/ide/src/lib.rs | 9 +- crates/ide/src/prime_caches.rs | 161 ++++++++++++++++-- crates/ide/src/prime_caches/topologic_sort.rs | 101 +++++++++++ crates/rust-analyzer/src/main_loop.rs | 26 ++- 6 files changed, 275 insertions(+), 26 deletions(-) create mode 100644 crates/ide/src/prime_caches/topologic_sort.rs diff --git a/Cargo.lock b/Cargo.lock index 1dcbc14f014..915da5d2837 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -577,6 +577,8 @@ version = "0.0.0" dependencies = [ "cfg", "cov-mark", + "crossbeam-channel", + "crossbeam-utils", "dot", "either", "expect-test", diff --git a/crates/ide/Cargo.toml b/crates/ide/Cargo.toml index c5e79838fc0..8cddc1e8ec2 100644 --- a/crates/ide/Cargo.toml +++ b/crates/ide/Cargo.toml @@ -11,6 +11,8 @@ doctest = false [dependencies] cov-mark = "2.0.0-pre.1" +crossbeam-channel = "0.5.0" +crossbeam-utils = "0.8.5" either = "1.5.3" itertools = "0.10.0" tracing = "0.1" diff --git a/crates/ide/src/lib.rs b/crates/ide/src/lib.rs index db7b80f71bf..5c872fe9c71 100644 --- a/crates/ide/src/lib.rs +++ b/crates/ide/src/lib.rs @@ -87,7 +87,7 @@ macro_rules! eprintln { moniker::{MonikerKind, MonikerResult, PackageInformation}, move_item::Direction, navigation_target::NavigationTarget, - prime_caches::PrimeCachesProgress, + prime_caches::{ParallelPrimeCachesProgress, PrimeCachesProgress}, references::ReferenceSearchResult, rename::RenameError, runnables::{Runnable, RunnableKind, TestId}, @@ -251,6 +251,13 @@ pub fn prime_caches(&self, cb: F) -> Cancellable<()> self.with_db(move |db| prime_caches::prime_caches(db, &cb)) } + pub fn parallel_prime_caches(&self, num_worker_threads: u8, cb: F) -> Cancellable<()> + where + F: Fn(ParallelPrimeCachesProgress) + Sync + std::panic::UnwindSafe, + { + self.with_db(move |db| prime_caches::parallel_prime_caches(db, num_worker_threads, &cb)) + } + /// Gets the text of the source file. pub fn file_text(&self, file_id: FileId) -> Cancellable> { self.with_db(|db| db.file_text(file_id)) diff --git a/crates/ide/src/prime_caches.rs b/crates/ide/src/prime_caches.rs index 5eba1d1e276..b873329e861 100644 --- a/crates/ide/src/prime_caches.rs +++ b/crates/ide/src/prime_caches.rs @@ -2,10 +2,14 @@ //! sometimes is counter productive when, for example, the first goto definition //! request takes longer to compute. This modules implemented prepopulation of //! various caches, it's not really advanced at the moment. +mod topologic_sort; use hir::db::DefDatabase; -use ide_db::base_db::{SourceDatabase, SourceDatabaseExt}; -use rustc_hash::FxHashSet; +use ide_db::base_db::{ + salsa::{Database, ParallelDatabase, Snapshot}, + Cancelled, CrateGraph, CrateId, SourceDatabase, SourceDatabaseExt, +}; +use rustc_hash::{FxHashMap, FxHashSet}; use crate::RootDatabase; @@ -20,23 +24,8 @@ pub struct PrimeCachesProgress { pub(crate) fn prime_caches(db: &RootDatabase, cb: &(dyn Fn(PrimeCachesProgress) + Sync)) { let _p = profile::span("prime_caches"); let graph = db.crate_graph(); - // We're only interested in the workspace crates and the `ImportMap`s of their direct - // dependencies, though in practice the latter also compute the `DefMap`s. - // We don't prime transitive dependencies because they're generally not visible in - // the current workspace. - let to_prime: FxHashSet<_> = graph - .iter() - .filter(|&id| { - let file_id = graph[id].root_file_id; - let root_id = db.file_source_root(file_id); - !db.source_root(root_id).is_library - }) - .flat_map(|id| graph[id].dependencies.iter().map(|krate| krate.crate_id)) - .collect(); + let to_prime = compute_crates_to_prime(db, &graph); - // FIXME: This would be easy to parallelize, since it's in the ideal ordering for that. - // Unfortunately rayon prevents panics from propagation out of a `scope`, which breaks - // cancellation, so we cannot use rayon. let n_total = to_prime.len(); for (n_done, &crate_id) in to_prime.iter().enumerate() { let crate_name = graph[crate_id].display_name.as_deref().unwrap_or_default().to_string(); @@ -46,3 +35,139 @@ pub(crate) fn prime_caches(db: &RootDatabase, cb: &(dyn Fn(PrimeCachesProgress) db.import_map(crate_id); } } + +/// We're indexing many crates. +#[derive(Debug)] +pub struct ParallelPrimeCachesProgress { + /// the crates that we are currently priming. + pub crates_currently_indexing: Vec, + /// the total number of crates we want to prime. + pub crates_total: usize, + /// the total number of crates that have finished priming + pub crates_done: usize, +} + +pub(crate) fn parallel_prime_caches(db: &RootDatabase, num_worker_threads: u8, cb: &F) +where + F: Fn(ParallelPrimeCachesProgress) + Sync + std::panic::UnwindSafe, +{ + let _p = profile::span("prime_caches"); + + let graph = db.crate_graph(); + let mut crates_to_prime = { + let crate_ids = compute_crates_to_prime(db, &graph); + + let mut builder = topologic_sort::TopologicalSortIter::builder(); + + for &crate_id in &crate_ids { + let crate_data = &graph[crate_id]; + let dependencies = crate_data + .dependencies + .iter() + .map(|d| d.crate_id) + .filter(|i| crate_ids.contains(i)); + + builder.add(crate_id, dependencies); + } + + builder.build() + }; + + crossbeam_utils::thread::scope(move |s| { + let (work_sender, work_receiver) = crossbeam_channel::unbounded(); + let (progress_sender, progress_receiver) = crossbeam_channel::unbounded(); + + enum ParallelPrimeCacheWorkerProgress { + BeginCrate { crate_id: CrateId, crate_name: String }, + EndCrate { crate_id: CrateId, cancelled: bool }, + } + + let prime_caches_worker = move |db: Snapshot| { + while let Ok((crate_id, crate_name)) = work_receiver.recv() { + progress_sender + .send(ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name })?; + + let cancelled = Cancelled::catch(|| { + // This also computes the DefMap + db.import_map(crate_id); + }) + .is_err(); + + progress_sender + .send(ParallelPrimeCacheWorkerProgress::EndCrate { crate_id, cancelled })?; + + if cancelled { + break; + } + } + + Ok::<_, crossbeam_channel::SendError<_>>(()) + }; + + for _ in 0..num_worker_threads { + let worker = prime_caches_worker.clone(); + let db = db.snapshot(); + s.spawn(move |_| worker(db)); + } + + let crates_total = crates_to_prime.len(); + let mut crates_done = 0; + + let mut is_cancelled = false; + let mut crates_currently_indexing = + FxHashMap::with_capacity_and_hasher(num_worker_threads as _, Default::default()); + + while !crates_to_prime.is_empty() && !is_cancelled { + for crate_id in &mut crates_to_prime { + work_sender + .send(( + crate_id, + graph[crate_id].display_name.as_deref().unwrap_or_default().to_string(), + )) + .ok(); + } + + let worker_progress = match progress_receiver.recv() { + Ok(p) => p, + Err(_) => break, + }; + match worker_progress { + ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => { + crates_currently_indexing.insert(crate_id, crate_name); + } + ParallelPrimeCacheWorkerProgress::EndCrate { crate_id, cancelled } => { + crates_currently_indexing.remove(&crate_id); + crates_to_prime.mark_done(crate_id); + crates_done += 1; + is_cancelled = cancelled; + } + }; + + let progress = ParallelPrimeCachesProgress { + crates_currently_indexing: crates_currently_indexing.values().cloned().collect(), + crates_done, + crates_total, + }; + + cb(progress); + db.unwind_if_cancelled(); + } + }) + .unwrap(); +} + +fn compute_crates_to_prime(db: &RootDatabase, graph: &CrateGraph) -> FxHashSet { + // We're only interested in the workspace crates and the `ImportMap`s of their direct + // dependencies, though in practice the latter also compute the `DefMap`s. + // We don't prime transitive dependencies because they're generally not visible in + // the current workspace. + graph + .iter() + .filter(|&id| { + let file_id = graph[id].root_file_id; + let root_id = db.file_source_root(file_id); + !db.source_root(root_id).is_library + }) + .flat_map(|id| graph[id].dependencies.iter().map(|krate| krate.crate_id)) + .collect() +} diff --git a/crates/ide/src/prime_caches/topologic_sort.rs b/crates/ide/src/prime_caches/topologic_sort.rs new file mode 100644 index 00000000000..f89f84bc5b5 --- /dev/null +++ b/crates/ide/src/prime_caches/topologic_sort.rs @@ -0,0 +1,101 @@ +use std::{collections::VecDeque, hash::Hash}; + +use rustc_hash::FxHashMap; + +pub struct TopologicSortIterBuilder { + nodes: FxHashMap>, +} + +impl TopologicSortIterBuilder +where + T: Copy + Eq + PartialEq + Hash, +{ + fn new() -> Self { + Self { nodes: Default::default() } + } + + fn get_or_create_entry(&mut self, item: T) -> &mut Entry { + self.nodes.entry(item).or_default() + } + + pub fn add(&mut self, item: T, predecessors: impl IntoIterator) { + let mut num_predecessors = 0; + + for predecessor in predecessors.into_iter() { + self.get_or_create_entry(predecessor).successors.push(item); + num_predecessors += 1; + } + + let entry = self.get_or_create_entry(item); + entry.num_predecessors += num_predecessors; + } + + pub fn build(self) -> TopologicalSortIter { + let ready = self + .nodes + .iter() + .filter_map( + |(item, entry)| if entry.num_predecessors == 0 { Some(*item) } else { None }, + ) + .collect(); + + TopologicalSortIter { nodes: self.nodes, ready } + } +} + +pub struct TopologicalSortIter { + ready: VecDeque, + nodes: FxHashMap>, +} + +impl TopologicalSortIter +where + T: Copy + Eq + PartialEq + Hash, +{ + pub fn builder() -> TopologicSortIterBuilder { + TopologicSortIterBuilder::new() + } + + pub fn len(&self) -> usize { + self.nodes.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn mark_done(&mut self, item: T) { + let entry = self.nodes.remove(&item).expect("invariant: unknown item marked as done"); + + for successor in entry.successors { + let succ_entry = self + .nodes + .get_mut(&successor) + .expect("invariant: unknown successor referenced by entry"); + + succ_entry.num_predecessors -= 1; + if succ_entry.num_predecessors == 0 { + self.ready.push_back(successor); + } + } + } +} + +impl Iterator for TopologicalSortIter { + type Item = T; + + fn next(&mut self) -> Option { + self.ready.pop_front() + } +} + +struct Entry { + successors: Vec, + num_predecessors: usize, +} + +impl Default for Entry { + fn default() -> Self { + Self { successors: Default::default(), num_predecessors: 0 } + } +} diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs index af987230def..c0736ede090 100644 --- a/crates/rust-analyzer/src/main_loop.rs +++ b/crates/rust-analyzer/src/main_loop.rs @@ -70,7 +70,7 @@ pub(crate) enum Task { #[derive(Debug)] pub(crate) enum PrimeCachesProgress { Begin, - Report(ide::PrimeCachesProgress), + Report(ide::ParallelPrimeCachesProgress), End { cancelled: bool }, } @@ -291,11 +291,23 @@ fn handle_event(&mut self, event: Event) -> Result<()> { } PrimeCachesProgress::Report(report) => { state = Progress::Report; - message = Some(format!( - "{}/{} ({})", - report.n_done, report.n_total, report.on_crate - )); - fraction = Progress::fraction(report.n_done, report.n_total); + + message = match &report.crates_currently_indexing[..] { + [crate_name] => Some(format!( + "{}/{} ({})", + report.crates_done, report.crates_total, crate_name + )), + [crate_name, rest @ ..] => Some(format!( + "{}/{} ({} + {} more)", + report.crates_done, + report.crates_total, + crate_name, + rest.len() + )), + _ => None, + }; + + fraction = Progress::fraction(report.crates_done, report.crates_total); } PrimeCachesProgress::End { cancelled } => { state = Progress::End; @@ -497,7 +509,7 @@ fn handle_event(&mut self, event: Event) -> Result<()> { let analysis = self.snapshot().analysis; move |sender| { sender.send(Task::PrimeCaches(PrimeCachesProgress::Begin)).unwrap(); - let res = analysis.prime_caches(|progress| { + let res = analysis.parallel_prime_caches(32, |progress| { let report = PrimeCachesProgress::Report(progress); sender.send(Task::PrimeCaches(report)).unwrap(); });