Skip to content

Commit 3168148

Browse files
committed
ide: parallel prime caches
1 parent b4c3148 commit 3168148

File tree

6 files changed

+273
-24
lines changed

6 files changed

+273
-24
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/ide/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ doctest = false
1111

1212
[dependencies]
1313
cov-mark = "2.0.0-pre.1"
14+
crossbeam-channel = "0.5.0"
15+
crossbeam-utils = "0.8.5"
1416
either = "1.5.3"
1517
itertools = "0.10.0"
1618
tracing = "0.1"

crates/ide/src/lib.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ pub use crate::{
8787
moniker::{MonikerKind, MonikerResult, PackageInformation},
8888
move_item::Direction,
8989
navigation_target::NavigationTarget,
90-
prime_caches::PrimeCachesProgress,
90+
prime_caches::{ParallelPrimeCachesProgress, PrimeCachesProgress},
9191
references::ReferenceSearchResult,
9292
rename::RenameError,
9393
runnables::{Runnable, RunnableKind, TestId},
@@ -251,6 +251,13 @@ impl Analysis {
251251
self.with_db(move |db| prime_caches::prime_caches(db, &cb))
252252
}
253253

254+
pub fn parallel_prime_caches<F>(&self, num_worker_threads: u8, cb: F) -> Cancellable<()>
255+
where
256+
F: Fn(ParallelPrimeCachesProgress) + Sync + std::panic::UnwindSafe,
257+
{
258+
self.with_db(move |db| prime_caches::parallel_prime_caches(db, num_worker_threads, &cb))
259+
}
260+
254261
/// Gets the text of the source file.
255262
pub fn file_text(&self, file_id: FileId) -> Cancellable<Arc<String>> {
256263
self.with_db(|db| db.file_text(file_id))

crates/ide/src/prime_caches.rs

Lines changed: 141 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@
22
//! sometimes is counter productive when, for example, the first goto definition
33
//! request takes longer to compute. This modules implemented prepopulation of
44
//! various caches, it's not really advanced at the moment.
5+
mod topologic_sort;
56

67
use hir::db::DefDatabase;
7-
use ide_db::base_db::{SourceDatabase, SourceDatabaseExt};
8-
use rustc_hash::FxHashSet;
8+
use ide_db::base_db::{
9+
salsa::{Database, ParallelDatabase, Snapshot},
10+
Cancelled, CrateGraph, CrateId, SourceDatabase, SourceDatabaseExt,
11+
};
12+
use rustc_hash::{FxHashMap, FxHashSet};
913

1014
use crate::RootDatabase;
1115

@@ -20,29 +24,150 @@ pub struct PrimeCachesProgress {
2024
pub(crate) fn prime_caches(db: &RootDatabase, cb: &(dyn Fn(PrimeCachesProgress) + Sync)) {
2125
let _p = profile::span("prime_caches");
2226
let graph = db.crate_graph();
27+
let to_prime = compute_crates_to_prime(db, &graph);
28+
29+
let n_total = to_prime.len();
30+
for (n_done, &crate_id) in to_prime.iter().enumerate() {
31+
let crate_name = graph[crate_id].display_name.as_deref().unwrap_or_default().to_string();
32+
33+
cb(PrimeCachesProgress { on_crate: crate_name, n_done, n_total });
34+
// This also computes the DefMap
35+
db.import_map(crate_id);
36+
}
37+
}
38+
39+
/// We're indexing many crates.
40+
#[derive(Debug)]
41+
pub struct ParallelPrimeCachesProgress {
42+
/// the crates that we are currently priming.
43+
pub crates_currently_indexing: Vec<String>,
44+
/// the total number of crates we want to prime.
45+
pub crates_total: usize,
46+
/// the total number of crates that have finished priming
47+
pub crates_done: usize,
48+
}
49+
50+
pub(crate) fn parallel_prime_caches<F>(db: &RootDatabase, num_worker_threads: u8, cb: &F)
51+
where
52+
F: Fn(ParallelPrimeCachesProgress) + Sync + std::panic::UnwindSafe,
53+
{
54+
let _p = profile::span("prime_caches");
55+
56+
let graph = db.crate_graph();
57+
let mut crates_to_prime = {
58+
let crate_ids = compute_crates_to_prime(db, &graph);
59+
60+
let mut builder = topologic_sort::TopologicalSortIter::builder();
61+
62+
for &crate_id in &crate_ids {
63+
let crate_data = &graph[crate_id];
64+
let dependencies = crate_data
65+
.dependencies
66+
.iter()
67+
.map(|d| d.crate_id)
68+
.filter(|i| crate_ids.contains(i));
69+
70+
builder.add(crate_id, dependencies);
71+
}
72+
73+
builder.build()
74+
};
75+
76+
crossbeam_utils::thread::scope(move |s| {
77+
let (work_sender, work_receiver) = crossbeam_channel::unbounded();
78+
let (progress_sender, progress_receiver) = crossbeam_channel::unbounded();
79+
80+
enum ParallelPrimeCacheWorkerProgress {
81+
BeginCrate { crate_id: CrateId, crate_name: String },
82+
EndCrate { crate_id: CrateId, cancelled: bool },
83+
}
84+
85+
let prime_caches_worker = move |db: Snapshot<RootDatabase>| {
86+
while let Ok((crate_id, crate_name)) = work_receiver.recv() {
87+
progress_sender
88+
.send(ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name })?;
89+
90+
let cancelled = Cancelled::catch(|| {
91+
// This also computes the DefMap
92+
db.import_map(crate_id);
93+
})
94+
.is_err();
95+
96+
progress_sender
97+
.send(ParallelPrimeCacheWorkerProgress::EndCrate { crate_id, cancelled })?;
98+
99+
if cancelled {
100+
break;
101+
}
102+
}
103+
104+
Ok::<_, crossbeam_channel::SendError<_>>(())
105+
};
106+
107+
for _ in 0..num_worker_threads {
108+
let worker = prime_caches_worker.clone();
109+
let db = db.snapshot();
110+
s.spawn(move |_| worker(db));
111+
}
112+
113+
let crates_total = crates_to_prime.len();
114+
let mut crates_done = 0;
115+
116+
let mut is_cancelled = false;
117+
let mut crates_currently_indexing =
118+
FxHashMap::with_capacity_and_hasher(num_worker_threads as _, Default::default());
119+
120+
while !crates_to_prime.is_empty() && !is_cancelled {
121+
for crate_id in &mut crates_to_prime {
122+
work_sender
123+
.send((
124+
crate_id,
125+
graph[crate_id].display_name.as_deref().unwrap_or_default().to_string(),
126+
))
127+
.ok();
128+
}
129+
130+
let worker_progress = match progress_receiver.recv() {
131+
Ok(p) => p,
132+
Err(_) => break,
133+
};
134+
match worker_progress {
135+
ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => {
136+
crates_currently_indexing.insert(crate_id, crate_name);
137+
}
138+
ParallelPrimeCacheWorkerProgress::EndCrate { crate_id, cancelled } => {
139+
crates_currently_indexing.remove(&crate_id);
140+
crates_to_prime.mark_done(crate_id);
141+
crates_done += 1;
142+
is_cancelled = cancelled;
143+
}
144+
};
145+
146+
let progress = ParallelPrimeCachesProgress {
147+
crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
148+
crates_done,
149+
crates_total,
150+
};
151+
152+
cb(progress);
153+
db.unwind_if_cancelled();
154+
}
155+
})
156+
.unwrap();
157+
}
158+
159+
fn compute_crates_to_prime(db: &RootDatabase, graph: &CrateGraph) -> FxHashSet<CrateId> {
23160
// We're only interested in the workspace crates and the `ImportMap`s of their direct
24161
// dependencies, though in practice the latter also compute the `DefMap`s.
25162
// We don't prime transitive dependencies because they're generally not visible in
26163
// the current workspace.
27-
let to_prime: FxHashSet<_> = graph
164+
graph
28165
.iter()
29166
.filter(|&id| {
30167
let file_id = graph[id].root_file_id;
31168
let root_id = db.file_source_root(file_id);
32169
!db.source_root(root_id).is_library
33170
})
34171
.flat_map(|id| graph[id].dependencies.iter().map(|krate| krate.crate_id))
35-
.collect();
36-
37-
// FIXME: This would be easy to parallelize, since it's in the ideal ordering for that.
38-
// Unfortunately rayon prevents panics from propagation out of a `scope`, which breaks
39-
// cancellation, so we cannot use rayon.
40-
let n_total = to_prime.len();
41-
for (n_done, &crate_id) in to_prime.iter().enumerate() {
42-
let crate_name = graph[crate_id].display_name.as_deref().unwrap_or_default().to_string();
43-
44-
cb(PrimeCachesProgress { on_crate: crate_name, n_done, n_total });
45-
// This also computes the DefMap
46-
db.import_map(crate_id);
47-
}
172+
.collect()
48173
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use std::{collections::VecDeque, hash::Hash};
2+
3+
use rustc_hash::FxHashMap;
4+
5+
pub struct TopologicSortIterBuilder<T> {
6+
nodes: FxHashMap<T, Entry<T>>,
7+
}
8+
9+
impl<T> TopologicSortIterBuilder<T>
10+
where
11+
T: Copy + Eq + PartialEq + Hash,
12+
{
13+
fn new() -> Self {
14+
Self { nodes: Default::default() }
15+
}
16+
17+
fn get_or_create_entry(&mut self, item: T) -> &mut Entry<T> {
18+
self.nodes.entry(item).or_default()
19+
}
20+
21+
pub fn add(&mut self, item: T, predecessors: impl IntoIterator<Item = T>) {
22+
let mut num_predecessors = 0;
23+
24+
for predecessor in predecessors.into_iter() {
25+
self.get_or_create_entry(predecessor).successors.push(item);
26+
num_predecessors += 1;
27+
}
28+
29+
let entry = self.get_or_create_entry(item);
30+
entry.num_predecessors += num_predecessors;
31+
}
32+
33+
pub fn build(self) -> TopologicalSortIter<T> {
34+
let ready = self
35+
.nodes
36+
.iter()
37+
.filter_map(
38+
|(item, entry)| if entry.num_predecessors == 0 { Some(*item) } else { None },
39+
)
40+
.collect();
41+
42+
TopologicalSortIter { nodes: self.nodes, ready }
43+
}
44+
}
45+
46+
pub struct TopologicalSortIter<T> {
47+
ready: VecDeque<T>,
48+
nodes: FxHashMap<T, Entry<T>>,
49+
}
50+
51+
impl<T> TopologicalSortIter<T>
52+
where
53+
T: Copy + Eq + PartialEq + Hash,
54+
{
55+
pub fn builder() -> TopologicSortIterBuilder<T> {
56+
TopologicSortIterBuilder::new()
57+
}
58+
59+
pub fn len(&self) -> usize {
60+
self.nodes.len()
61+
}
62+
63+
pub fn is_empty(&self) -> bool {
64+
self.len() == 0
65+
}
66+
67+
pub fn mark_done(&mut self, item: T) {
68+
let entry = self.nodes.remove(&item).expect("invariant: unknown item marked as done");
69+
70+
for successor in entry.successors {
71+
let succ_entry = self
72+
.nodes
73+
.get_mut(&successor)
74+
.expect("invariant: unknown successor referenced by entry");
75+
76+
succ_entry.num_predecessors -= 1;
77+
if succ_entry.num_predecessors == 0 {
78+
self.ready.push_back(successor);
79+
}
80+
}
81+
}
82+
}
83+
84+
impl<T> Iterator for TopologicalSortIter<T> {
85+
type Item = T;
86+
87+
fn next(&mut self) -> Option<Self::Item> {
88+
self.ready.pop_front()
89+
}
90+
}
91+
92+
struct Entry<T> {
93+
successors: Vec<T>,
94+
num_predecessors: usize,
95+
}
96+
97+
impl<T> Default for Entry<T> {
98+
fn default() -> Self {
99+
Self { successors: Default::default(), num_predecessors: 0 }
100+
}
101+
}

crates/rust-analyzer/src/main_loop.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ pub(crate) enum Task {
7070
#[derive(Debug)]
7171
pub(crate) enum PrimeCachesProgress {
7272
Begin,
73-
Report(ide::PrimeCachesProgress),
73+
Report(ide::ParallelPrimeCachesProgress),
7474
End { cancelled: bool },
7575
}
7676

@@ -291,11 +291,23 @@ impl GlobalState {
291291
}
292292
PrimeCachesProgress::Report(report) => {
293293
state = Progress::Report;
294-
message = Some(format!(
295-
"{}/{} ({})",
296-
report.n_done, report.n_total, report.on_crate
297-
));
298-
fraction = Progress::fraction(report.n_done, report.n_total);
294+
295+
message = match &report.crates_currently_indexing[..] {
296+
[crate_name] => Some(format!(
297+
"{}/{} ({})",
298+
report.crates_done, report.crates_total, crate_name
299+
)),
300+
[crate_name, rest @ ..] => Some(format!(
301+
"{}/{} ({} + {} more)",
302+
report.crates_done,
303+
report.crates_total,
304+
crate_name,
305+
rest.len()
306+
)),
307+
_ => None,
308+
};
309+
310+
fraction = Progress::fraction(report.crates_done, report.crates_total);
299311
}
300312
PrimeCachesProgress::End { cancelled } => {
301313
state = Progress::End;
@@ -497,7 +509,7 @@ impl GlobalState {
497509
let analysis = self.snapshot().analysis;
498510
move |sender| {
499511
sender.send(Task::PrimeCaches(PrimeCachesProgress::Begin)).unwrap();
500-
let res = analysis.prime_caches(|progress| {
512+
let res = analysis.parallel_prime_caches(32, |progress| {
501513
let report = PrimeCachesProgress::Report(progress);
502514
sender.send(Task::PrimeCaches(report)).unwrap();
503515
});

0 commit comments

Comments
 (0)