Skip to content

Commit 6bcbcd3

Browse files
committed
add derivation pipeline cache
1 parent c7c25fa commit 6bcbcd3

File tree

8 files changed

+155
-22
lines changed

8 files changed

+155
-22
lines changed

Cargo.lock

Lines changed: 25 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ futures = { version = "0.3", default-features = false }
223223
lru = "0.13.0"
224224
metrics = "0.24.0"
225225
metrics-derive = "0.1"
226+
moka = "0.12.11"
226227
parking_lot = "0.12"
227228
rand = { version = "0.9" }
228229
rayon = "1.7"

crates/database/db/src/db.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,14 @@ impl DatabaseReadOperations for Database {
643643
)
644644
}
645645

646+
async fn get_max_block_data_hint_block_number(&self) -> Result<u64, DatabaseError> {
647+
metered!(
648+
DatabaseOperation::GetMaxBlockDataHintBlockNumber,
649+
self,
650+
tx(|tx| async move { tx.get_max_block_data_hint_block_number().await })
651+
)
652+
}
653+
646654
async fn get_l2_block_and_batch_info_by_hash(
647655
&self,
648656
block_hash: B256,

crates/database/db/src/metrics.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ pub(crate) enum DatabaseOperation {
6464
GetL2HeadBlockNumber,
6565
GetNL1Messages,
6666
GetNL2BlockDataHint,
67+
GetMaxBlockDataHintBlockNumber,
6768
GetL2BlockAndBatchInfoByHash,
6869
GetL2BlockInfoByNumber,
6970
GetLatestSafeL2Info,
@@ -132,6 +133,7 @@ impl DatabaseOperation {
132133
Self::GetL2HeadBlockNumber => "get_l2_head_block_number",
133134
Self::GetNL1Messages => "get_n_l1_messages",
134135
Self::GetNL2BlockDataHint => "get_n_l2_block_data_hint",
136+
Self::GetMaxBlockDataHintBlockNumber => "get_max_block_data_hint_block_number",
135137
Self::GetL2BlockAndBatchInfoByHash => "get_l2_block_and_batch_info_by_hash",
136138
Self::GetL2BlockInfoByNumber => "get_l2_block_info_by_number",
137139
Self::GetLatestSafeL2Info => "get_latest_safe_l2_info",

crates/database/db/src/operations.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -996,6 +996,9 @@ pub trait DatabaseReadOperations {
996996
n: usize,
997997
) -> Result<Vec<L1MessageEnvelope>, DatabaseError>;
998998

999+
/// Get the maximum block number for which we have stored extra data hints.
1000+
async fn get_max_block_data_hint_block_number(&self) -> Result<u64, DatabaseError>;
1001+
9991002
/// Get the extra data for n block, starting at the provided block number.
10001003
async fn get_n_l2_block_data_hint(
10011004
&self,
@@ -1374,6 +1377,18 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
13741377
.collect())
13751378
}
13761379

1380+
async fn get_max_block_data_hint_block_number(&self) -> Result<u64, DatabaseError> {
1381+
Ok(models::block_data::Entity::find()
1382+
.select_only()
1383+
.column_as(models::block_data::Column::Number.max(), "max_number")
1384+
.into_tuple::<Option<i64>>()
1385+
.one(self.get_connection())
1386+
.await?
1387+
.flatten()
1388+
.map(|n| n as u64)
1389+
.unwrap_or(0))
1390+
}
1391+
13771392
async fn get_l2_block_and_batch_info_by_hash(
13781393
&self,
13791394
block_hash: B256,

crates/derivation-pipeline/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ scroll-db.workspace = true
2929
futures.workspace = true
3030
metrics.workspace = true
3131
metrics-derive.workspace = true
32+
moka = { workspace = true, features = ["future"] }
3233
tokio.workspace = true
3334
thiserror.workspace = true
3435
tracing.workspace = true
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use moka::future::Cache;
2+
use scroll_alloy_rpc_types_engine::BlockDataHint;
3+
use scroll_db::{Database, DatabaseReadOperations};
4+
use std::{sync::Arc, time::Duration};
5+
6+
use crate::DerivationPipelineError;
7+
8+
/// The default size of the block data hint pre-fetch cache (number of entries).
9+
pub(crate) const DEFAULT_CACHE_SIZE: usize = 80_000;
10+
11+
/// The default number of block data hints to pre-fetch.
12+
pub(crate) const DEFAULT_PREFETCH_COUNT: usize = 60_000;
13+
14+
/// The default time-to-live (TTL) for cache entries.
15+
pub(crate) const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(120 * 60); // 120 minutes
16+
17+
#[derive(Debug, Clone)]
18+
pub struct PreFetchCache {
19+
db: Arc<Database>,
20+
hint_cache: Cache<u64, BlockDataHint>,
21+
// TODO: Add a cache for batches.
22+
max_block_data_hint_block_number: u64,
23+
pre_fetch_count: usize,
24+
}
25+
26+
impl PreFetchCache {
27+
/// Creates a new block data hint pre-fetch cache with default settings.
28+
pub(crate) async fn new(
29+
db: Arc<Database>,
30+
size: usize,
31+
ttl: Duration,
32+
pre_fetch_count: usize,
33+
) -> Result<Self, DerivationPipelineError> {
34+
let max_block_data_hint_block_number = db.get_max_block_data_hint_block_number().await?;
35+
Ok(Self {
36+
db,
37+
hint_cache: Cache::builder().max_capacity(size as u64).time_to_live(ttl).build(),
38+
max_block_data_hint_block_number,
39+
pre_fetch_count,
40+
})
41+
}
42+
43+
/// Fetches the block data hint for the given block number, using the cache if possible.
44+
pub(crate) async fn get(
45+
&self,
46+
block_number: u64,
47+
) -> Result<Option<BlockDataHint>, DerivationPipelineError> {
48+
if block_number > self.max_block_data_hint_block_number {
49+
return Ok(None);
50+
}
51+
52+
if let Some(cached_hint) = self.hint_cache.get(&block_number).await {
53+
return Ok(Some(cached_hint));
54+
}
55+
56+
let hints = self.db.get_n_l2_block_data_hint(block_number, self.pre_fetch_count).await?;
57+
let requested = hints.first().cloned();
58+
for (idx, hint) in hints.into_iter().enumerate() {
59+
self.hint_cache.insert(block_number + idx as u64, hint.clone()).await;
60+
}
61+
62+
Ok(requested)
63+
}
64+
}

crates/derivation-pipeline/src/lib.rs

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ use scroll_codec::{decoding::payload::PayloadData, Codec, CodecError, DecodingEr
1414
use scroll_db::{Database, DatabaseReadOperations, L1MessageKey};
1515
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
1616

17+
mod cache;
18+
use cache::{PreFetchCache, DEFAULT_CACHE_SIZE, DEFAULT_CACHE_TTL, DEFAULT_PREFETCH_COUNT};
19+
1720
mod data_source;
1821

1922
mod error;
@@ -108,6 +111,8 @@ pub struct DerivationPipelineWorker<P> {
108111
futures: FuturesOrdered<DerivationPipelineFuture>,
109112
/// A reference to the database.
110113
database: Arc<Database>,
114+
/// A cache for pre-fetching derivation pipeline data from the database.
115+
cache: PreFetchCache,
111116
/// A L1 provider.
112117
l1_provider: P,
113118
/// The L1 message queue index at which the V2 L1 message queue was enabled.
@@ -118,22 +123,30 @@ pub struct DerivationPipelineWorker<P> {
118123

119124
impl<P> DerivationPipelineWorker<P> {
120125
/// Returns a new instance of the [`DerivationPipeline`].
121-
pub fn new(
126+
pub async fn new(
122127
l1_provider: P,
123128
database: Arc<Database>,
124129
l1_v2_message_queue_start_index: u64,
125130
batch_receiver: UnboundedReceiver<Arc<BatchDerivationRequest>>,
126131
result_sender: UnboundedSender<BatchDerivationResult>,
127-
) -> Self {
128-
Self {
132+
) -> Result<Self, DerivationPipelineError> {
133+
let cache = PreFetchCache::new(
134+
database.clone(),
135+
DEFAULT_CACHE_SIZE,
136+
DEFAULT_CACHE_TTL,
137+
DEFAULT_PREFETCH_COUNT,
138+
)
139+
.await?;
140+
Ok(Self {
129141
batch_receiver,
130142
result_sender,
131143
futures: FuturesOrdered::new(),
132144
database,
145+
cache,
133146
l1_provider,
134147
l1_v2_message_queue_start_index,
135148
metrics: DerivationPipelineMetrics::default(),
136-
}
149+
})
137150
}
138151
}
139152

@@ -158,7 +171,9 @@ where
158171
l1_v2_message_queue_start_index,
159172
batch_receiver,
160173
result_sender,
161-
);
174+
)
175+
.await
176+
.expect("Failed to create derivation pipeline worker");
162177

163178
worker.run().await;
164179
});
@@ -207,6 +222,7 @@ where
207222

208223
fn derivation_future(&self, request: Arc<BatchDerivationRequest>) -> DerivationPipelineFuture {
209224
let db = self.database.clone();
225+
let cache = self.cache.clone();
210226
let metrics = self.metrics.clone();
211227
let provider = self.l1_provider.clone();
212228
let l1_v2_message_queue_start_index = self.l1_v2_message_queue_start_index;
@@ -228,7 +244,7 @@ where
228244

229245
// derive the attributes and attach the corresponding batch info.
230246
let result =
231-
derive(batch, target_status, provider, db, l1_v2_message_queue_start_index)
247+
derive(batch, target_status, provider, cache, l1_v2_message_queue_start_index)
232248
.await
233249
.map_err(|err| (request.clone(), err))?;
234250

@@ -286,11 +302,11 @@ type DerivationPipelineFuture = Pin<
286302

287303
/// Returns a [`BatchDerivationResult`] from the [`BatchCommitData`] by deriving the payload
288304
/// attributes for each L2 block in the batch.
289-
pub async fn derive<L1P: L1Provider + Sync + Send, DB: DatabaseReadOperations>(
305+
pub async fn derive<L1P: L1Provider + Sync + Send>(
290306
batch: BatchCommitData,
291307
target_status: BatchStatus,
292308
l1_provider: L1P,
293-
db: DB,
309+
cache: PreFetchCache,
294310
l1_v2_message_queue_start_index: u64,
295311
) -> Result<BatchDerivationResult, DerivationPipelineError> {
296312
// fetch the blob then decode the input batch.
@@ -323,14 +339,7 @@ pub async fn derive<L1P: L1Provider + Sync + Send, DB: DatabaseReadOperations>(
323339
let blocks = decoded.data.into_l2_blocks();
324340
let mut attributes = Vec::with_capacity(blocks.len());
325341

326-
let start = blocks.first().map(|b| b.context.number);
327-
let block_data = if let Some(start) = start {
328-
db.get_n_l2_block_data_hint(start, blocks.len()).await?
329-
} else {
330-
vec![]
331-
};
332-
333-
for (i, mut block) in blocks.into_iter().enumerate() {
342+
for mut block in blocks {
334343
// query the appropriate amount of l1 messages.
335344
let mut txs = Vec::with_capacity(block.context.num_transactions as usize);
336345
for _ in 0..block.context.num_l1_messages {
@@ -369,7 +378,10 @@ pub async fn derive<L1P: L1Provider + Sync + Send, DB: DatabaseReadOperations>(
369378
},
370379
transactions: Some(txs),
371380
no_tx_pool: true,
372-
block_data_hint: block_data.get(i).cloned().unwrap_or_else(BlockDataHint::none),
381+
block_data_hint: cache
382+
.get(block.context.number)
383+
.await?
384+
.unwrap_or_else(BlockDataHint::none),
373385
gas_limit: Some(block.context.gas_limit),
374386
},
375387
};
@@ -446,7 +458,7 @@ async fn iter_l1_messages_from_payload<L1P: L1Provider>(
446458
#[cfg(test)]
447459
mod tests {
448460
use super::*;
449-
use std::sync::Arc;
461+
use std::{sync::Arc, time::Duration};
450462

451463
use alloy_eips::Decodable2718;
452464
use alloy_primitives::{address, b256, bytes, U256};
@@ -643,10 +655,12 @@ mod tests {
643655
for message in l1_messages {
644656
db.insert_l1_message(message).await?;
645657
}
658+
let cache = PreFetchCache::new(db.clone(), 100, Duration::from_secs(60), 10).await?;
646659

647660
let l1_provider = MockL1Provider { db: db.clone(), blobs: HashMap::new() };
648661

649-
let result = derive(batch_data, BatchStatus::Committed, l1_provider, db, u64::MAX).await?;
662+
let result =
663+
derive(batch_data, BatchStatus::Committed, l1_provider, cache, u64::MAX).await?;
650664
let attribute = result
651665
.attributes
652666
.iter()
@@ -745,10 +759,11 @@ mod tests {
745759
}
746760

747761
let l1_provider = MockL1Provider { db: db.clone(), blobs: HashMap::new() };
762+
let cache = PreFetchCache::new(db.clone(), 100, Duration::from_secs(60), 10).await?;
748763

749764
// derive attributes and extract l1 messages.
750765
let attributes =
751-
derive(batch_data, BatchStatus::Committed, l1_provider, db, u64::MAX).await?;
766+
derive(batch_data, BatchStatus::Committed, l1_provider, cache, u64::MAX).await?;
752767
let derived_l1_messages: Vec<_> = attributes
753768
.attributes
754769
.into_iter()
@@ -800,10 +815,11 @@ mod tests {
800815
}
801816

802817
let l1_provider = MockL1Provider { db: db.clone(), blobs: HashMap::new() };
818+
let cache = PreFetchCache::new(db.clone(), 100, Duration::from_secs(60), 10).await?;
803819

804820
// derive attributes and extract l1 messages.
805821
let attributes =
806-
derive(batch_data, BatchStatus::Committed, l1_provider, db, u64::MAX).await?;
822+
derive(batch_data, BatchStatus::Committed, l1_provider, cache, u64::MAX).await?;
807823
let derived_l1_messages: Vec<_> = attributes
808824
.attributes
809825
.into_iter()
@@ -916,8 +932,9 @@ mod tests {
916932
blob_path
917933
)]),
918934
};
935+
let cache = PreFetchCache::new(db.clone(), 100, Duration::from_secs(60), 10).await?;
919936

920-
let attributes = derive(batch_data, BatchStatus::Committed, l1_provider, db, u64::MAX).await?;
937+
let attributes = derive(batch_data, BatchStatus::Committed, l1_provider, cache, u64::MAX).await?;
921938

922939
let attribute = attributes.attributes.last().unwrap();
923940
let expected = ScrollPayloadAttributes {

0 commit comments

Comments
 (0)