Skip to content

Commit 995abc6

Browse files
authored
perf: persistent cache use multi-thread (#9010)
* perf: persistent cache use multi-thread * fix: ci
1 parent ded9fc0 commit 995abc6

File tree

3 files changed

+66
-47
lines changed

3 files changed

+66
-47
lines changed

crates/rspack_core/src/cache/persistent/occasion/make/module_graph.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,16 @@ pub async fn recovery_module_graph(
132132
let mut need_check_dep = vec![];
133133
let mut partial = ModuleGraphPartial::default();
134134
let mut mg = ModuleGraph::new(vec![], Some(&mut partial));
135-
for (_, v) in storage.load(SCOPE).await? {
136-
let mut node: Node =
137-
from_bytes(&v, context).expect("unexpected module graph deserialize failed");
135+
let nodes: Vec<_> = storage
136+
.load(SCOPE)
137+
.await?
138+
.into_par_iter()
139+
.map(|(_, v)| {
140+
from_bytes::<Node, CacheableContext>(&v, context)
141+
.expect("unexpected module graph deserialize failed")
142+
})
143+
.collect();
144+
for mut node in nodes {
138145
for (dep, parent_block) in node.dependencies {
139146
mg.set_parents(
140147
*dep.id(),

crates/rspack_core/src/cache/persistent/snapshot/mod.rs

Lines changed: 50 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{path::Path, sync::Arc};
66
use rspack_cacheable::{from_bytes, to_bytes};
77
use rspack_error::Result;
88
use rspack_fs::ReadableFileSystem;
9+
use rspack_futures::FuturesResults;
910
use rspack_paths::{ArcPath, AssertUtf8};
1011
use rustc_hash::FxHashSet as HashSet;
1112

@@ -42,37 +43,38 @@ impl Snapshot {
4243
#[tracing::instrument("Cache::Snapshot::add", skip_all)]
4344
pub async fn add(&self, paths: impl Iterator<Item = &Path>) {
4445
let default_strategy = StrategyHelper::compile_time();
45-
let mut helper = StrategyHelper::new(self.fs.clone());
46-
// TODO use multi thread
46+
let helper = StrategyHelper::new(self.fs.clone());
4747
// TODO merge package version file
48-
for path in paths {
49-
let utf8_path = path.assert_utf8();
50-
// check path exists
51-
if self.fs.metadata(utf8_path).is_err() {
52-
continue;
53-
}
54-
// TODO directory path should check all sub file
55-
let path_str = utf8_path.as_str();
56-
if self.options.is_immutable_path(path_str) {
57-
continue;
58-
}
59-
if self.options.is_managed_path(path_str) {
60-
if let Some(v) = helper.package_version(path).await {
61-
self.storage.set(
62-
SCOPE,
63-
path.as_os_str().as_encoded_bytes().to_vec(),
64-
to_bytes::<_, ()>(&v, &()).expect("should to bytes success"),
65-
);
66-
continue;
48+
let _ = paths
49+
.map(|path| async {
50+
let utf8_path = path.assert_utf8();
51+
// check path exists
52+
if self.fs.metadata(utf8_path).is_err() {
53+
return;
6754
}
68-
}
69-
// compiler time
70-
self.storage.set(
71-
SCOPE,
72-
path.as_os_str().as_encoded_bytes().to_vec(),
73-
to_bytes::<_, ()>(&default_strategy, &()).expect("should to bytes success"),
74-
);
75-
}
55+
// TODO directory path should check all sub file
56+
let path_str = utf8_path.as_str();
57+
if self.options.is_immutable_path(path_str) {
58+
return;
59+
}
60+
if self.options.is_managed_path(path_str) {
61+
if let Some(v) = helper.package_version(path).await {
62+
self.storage.set(
63+
SCOPE,
64+
path.as_os_str().as_encoded_bytes().to_vec(),
65+
to_bytes::<_, ()>(&v, &()).expect("should to bytes success"),
66+
);
67+
return;
68+
}
69+
}
70+
// compiler time
71+
self.storage.set(
72+
SCOPE,
73+
path.as_os_str().as_encoded_bytes().to_vec(),
74+
to_bytes::<_, ()>(&default_strategy, &()).expect("should to bytes success"),
75+
);
76+
})
77+
.collect::<FuturesResults<_>>();
7678
}
7779

7880
pub fn remove(&self, paths: impl Iterator<Item = &Path>) {
@@ -85,16 +87,26 @@ impl Snapshot {
8587

8688
#[tracing::instrument("Cache::Snapshot::calc_modified_path", skip_all)]
8789
pub async fn calc_modified_paths(&self) -> Result<(HashSet<ArcPath>, HashSet<ArcPath>)> {
88-
let mut helper = StrategyHelper::new(self.fs.clone());
90+
let helper = StrategyHelper::new(self.fs.clone());
91+
92+
let results = self
93+
.storage
94+
.load(SCOPE)
95+
.await?
96+
.iter()
97+
.map(|(key, value)| async {
98+
let path: ArcPath = Path::new(&*String::from_utf8_lossy(key)).into();
99+
let strategy: Strategy =
100+
from_bytes::<Strategy, ()>(value, &()).expect("should from bytes success");
101+
let validate = helper.validate(&path, &strategy).await;
102+
(path, validate)
103+
})
104+
.collect::<FuturesResults<_>>();
105+
89106
let mut modified_path = HashSet::default();
90107
let mut deleted_path = HashSet::default();
91-
92-
// TODO use multi thread
93-
for (key, value) in self.storage.load(SCOPE).await? {
94-
let path: ArcPath = Path::new(&*String::from_utf8_lossy(&key)).into();
95-
let strategy: Strategy =
96-
from_bytes::<Strategy, ()>(&value, &()).expect("should from bytes success");
97-
match helper.validate(&path, &strategy).await {
108+
for (path, validate) in results.into_inner() {
109+
match validate {
98110
ValidateResult::Modified => {
99111
modified_path.insert(path);
100112
}
@@ -123,7 +135,7 @@ mod tests {
123135
};
124136
}
125137

126-
#[tokio::test]
138+
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
127139
async fn should_snapshot_work() {
128140
let fs = Arc::new(MemoryFileSystem::default());
129141
let storage = Arc::new(MemoryStorage::default());

crates/rspack_core/src/cache/persistent/snapshot/strategy.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ use std::{
44
time::{SystemTime, UNIX_EPOCH},
55
};
66

7+
use dashmap::DashMap;
78
use rspack_cacheable::cacheable;
89
use rspack_fs::ReadableFileSystem;
910
use rspack_paths::{ArcPath, AssertUtf8};
10-
use rustc_hash::FxHashMap as HashMap;
1111

1212
/// Snapshot check strategy
1313
#[cacheable]
@@ -38,7 +38,7 @@ pub enum ValidateResult {
3838

3939
pub struct StrategyHelper {
4040
fs: Arc<dyn ReadableFileSystem>,
41-
package_version_cache: HashMap<ArcPath, Option<String>>,
41+
package_version_cache: DashMap<ArcPath, Option<String>>,
4242
}
4343

4444
impl StrategyHelper {
@@ -60,7 +60,7 @@ impl StrategyHelper {
6060

6161
/// get path file version in package.json
6262
#[async_recursion::async_recursion]
63-
async fn package_version_with_cache(&mut self, path: &Path) -> Option<String> {
63+
async fn package_version_with_cache(&self, path: &Path) -> Option<String> {
6464
if let Some(version) = self.package_version_cache.get(path) {
6565
return version.clone();
6666
}
@@ -99,15 +99,15 @@ impl StrategyHelper {
9999
Strategy::CompileTime(now)
100100
}
101101
/// get path file package version strategy
102-
pub async fn package_version(&mut self, path: &Path) -> Option<Strategy> {
102+
pub async fn package_version(&self, path: &Path) -> Option<Strategy> {
103103
self
104104
.package_version_with_cache(path)
105105
.await
106106
.map(Strategy::PackageVersion)
107107
}
108108

109109
/// validate path file by target strategy
110-
pub async fn validate(&mut self, path: &Path, strategy: &Strategy) -> ValidateResult {
110+
pub async fn validate(&self, path: &Path, strategy: &Strategy) -> ValidateResult {
111111
match strategy {
112112
Strategy::PackageVersion(version) => {
113113
if let Some(ref cur_version) = self.package_version_with_cache(path).await {
@@ -172,7 +172,7 @@ mod tests {
172172
};
173173
assert!(time1 < time2);
174174

175-
let mut helper = StrategyHelper::new(fs.clone());
175+
let helper = StrategyHelper::new(fs.clone());
176176
// modified_time
177177
assert_eq!(
178178
helper.modified_time(Path::new("/file1")).await,

0 commit comments

Comments
 (0)