Skip to content

Commit 5da8a46

Browse files
committed
add async pre-fetch logic
1 parent 6bcbcd3 commit 5da8a46

File tree

4 files changed

+50
-18
lines changed

4 files changed

+50
-18
lines changed

crates/database/db/src/operations.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1369,6 +1369,7 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
13691369
) -> Result<Vec<BlockDataHint>, DatabaseError> {
13701370
Ok(models::block_data::Entity::find()
13711371
.filter(models::block_data::Column::Number.gte(block_number as i64))
1372+
.order_by_asc(models::block_data::Column::Number)
13721373
.limit(Some(n as u64))
13731374
.all(self.get_connection())
13741375
.await?

crates/database/db/src/service/retry.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ impl<S: DatabaseService> DatabaseService for Retry<S> {
9090
};
9191

9292
attempt += 1;
93-
tracing::debug!(target: "scroll::chain_orchestrator", ?error, attempt, delay_ms, "Retrying database query");
93+
tracing::debug!(target: "scroll::db", ?error, attempt, delay_ms, "Retrying database query");
9494

9595
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
9696
}

crates/derivation-pipeline/src/cache.rs

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,18 @@ use moka::future::Cache;
22
use scroll_alloy_rpc_types_engine::BlockDataHint;
33
use scroll_db::{Database, DatabaseReadOperations};
44
use std::{sync::Arc, time::Duration};
5+
use tokio::sync::Mutex;
56

67
use crate::DerivationPipelineError;
78

89
/// The default size of the block data hint pre-fetch cache (number of entries).
9-
pub(crate) const DEFAULT_CACHE_SIZE: usize = 80_000;
10+
pub(crate) const CACHE_SIZE: usize = 100_000;
1011

1112
/// The default number of block data hints to pre-fetch.
12-
pub(crate) const DEFAULT_PREFETCH_COUNT: usize = 60_000;
13+
pub(crate) const PREFETCH_RANGE_SIZE: usize = 50_000;
1314

1415
/// The default time-to-live (TTL) for cache entries.
15-
pub(crate) const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(120 * 60); // 120 minutes
16+
pub(crate) const CACHE_TTL: Duration = Duration::from_secs(120 * 60); // 120 minutes
1617

1718
#[derive(Debug, Clone)]
1819
pub struct PreFetchCache {
@@ -21,6 +22,7 @@ pub struct PreFetchCache {
2122
// TODO: Add a cache for batches.
2223
max_block_data_hint_block_number: u64,
2324
pre_fetch_count: usize,
25+
ranges_in_cache: Arc<Mutex<Vec<u64>>>,
2426
}
2527

2628
impl PreFetchCache {
@@ -37,6 +39,7 @@ impl PreFetchCache {
3739
hint_cache: Cache::builder().max_capacity(size as u64).time_to_live(ttl).build(),
3840
max_block_data_hint_block_number,
3941
pre_fetch_count,
42+
ranges_in_cache: Arc::new(Mutex::new(Vec::new())),
4043
})
4144
}
4245

@@ -45,20 +48,52 @@ impl PreFetchCache {
4548
&self,
4649
block_number: u64,
4750
) -> Result<Option<BlockDataHint>, DerivationPipelineError> {
51+
tracing::trace!(
52+
target: "scroll::derivation_pipeline::cache",
53+
block_number = block_number,
54+
"Fetching block data hint from cache",
55+
);
56+
// If the block number is beyond the maximum known block data hint, return None.
4857
if block_number > self.max_block_data_hint_block_number {
4958
return Ok(None);
5059
}
5160

61+
// If the hint is already cached, return it.
5262
if let Some(cached_hint) = self.hint_cache.get(&block_number).await {
5363
return Ok(Some(cached_hint));
5464
}
5565

56-
let hints = self.db.get_n_l2_block_data_hint(block_number, self.pre_fetch_count).await?;
57-
let requested = hints.first().cloned();
58-
for (idx, hint) in hints.into_iter().enumerate() {
59-
self.hint_cache.insert(block_number + idx as u64, hint.clone()).await;
60-
}
66+
// The data hint is not in cache so we will pre-fetch a range of hints.
67+
let range_start = self.pre_fetch_range_start(block_number);
68+
let mut ranges_in_cache = self.ranges_in_cache.lock().await;
69+
70+
// The range has already been fetched by another task (this is a rare case caused by a race)
71+
if !ranges_in_cache.contains(&range_start) {
72+
tracing::info!(
73+
target: "scroll::derivation_pipeline::cache",
74+
range_start = range_start,
75+
pre_fetch_count = self.pre_fetch_count,
76+
"Pre-fetching block data hints for range",
77+
);
78+
let hints = self.db.get_n_l2_block_data_hint(range_start, self.pre_fetch_count).await?;
79+
for (idx, hint) in hints.iter().enumerate() {
80+
self.hint_cache.insert(range_start + idx as u64, hint.clone()).await;
81+
}
82+
83+
if ranges_in_cache.len() == 2 {
84+
ranges_in_cache.remove(0);
85+
}
86+
87+
ranges_in_cache.push(range_start);
88+
};
89+
90+
drop(ranges_in_cache);
91+
92+
// Now the requested hint should be in cache.
93+
Ok(self.hint_cache.get(&block_number).await)
94+
}
6195

62-
Ok(requested)
96+
const fn pre_fetch_range_start(&self, block: u64) -> u64 {
97+
(block / self.pre_fetch_count as u64) * self.pre_fetch_count as u64
6398
}
6499
}

crates/derivation-pipeline/src/lib.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use scroll_db::{Database, DatabaseReadOperations, L1MessageKey};
1515
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
1616

1717
mod cache;
18-
use cache::{PreFetchCache, DEFAULT_CACHE_SIZE, DEFAULT_CACHE_TTL, DEFAULT_PREFETCH_COUNT};
18+
use cache::{PreFetchCache, CACHE_SIZE, CACHE_TTL, PREFETCH_RANGE_SIZE};
1919

2020
mod data_source;
2121

@@ -130,13 +130,9 @@ impl<P> DerivationPipelineWorker<P> {
130130
batch_receiver: UnboundedReceiver<Arc<BatchDerivationRequest>>,
131131
result_sender: UnboundedSender<BatchDerivationResult>,
132132
) -> Result<Self, DerivationPipelineError> {
133-
let cache = PreFetchCache::new(
134-
database.clone(),
135-
DEFAULT_CACHE_SIZE,
136-
DEFAULT_CACHE_TTL,
137-
DEFAULT_PREFETCH_COUNT,
138-
)
139-
.await?;
133+
let cache =
134+
PreFetchCache::new(database.clone(), CACHE_SIZE, CACHE_TTL, PREFETCH_RANGE_SIZE)
135+
.await?;
140136
Ok(Self {
141137
batch_receiver,
142138
result_sender,

0 commit comments

Comments
 (0)