Skip to content

Commit 23719c6

Browse files
committed
block downloader separation
1 parent 765cc1b commit 23719c6

File tree

8 files changed

+116
-62
lines changed

8 files changed

+116
-62
lines changed

Cargo.lock

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/katana/cli/src/args.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,8 @@ impl NodeArgs {
136136

137137
fn init_logging(&self) -> Result<()> {
138138
const DEFAULT_LOG_FILTER: &str =
139-
"pipeline=debug,info,tasks=debug,executor=trace,forking::backend=trace,\
140-
blockifier=off,jsonrpsee_server=off,hyper=off,\
141-
messaging=debug,node=error";
139+
"pipeline=debug,info,tasks=debug,executor=trace,forking::backend=trace,blockifier=off,\
140+
jsonrpsee_server=off,hyper=off,messaging=debug,node=error";
142141

143142
let filter = if self.development.dev {
144143
&format!("{DEFAULT_LOG_FILTER},server=debug")

crates/katana/core/src/backend/storage.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use katana_primitives::version::ProtocolVersion;
1212
use katana_provider::providers::db::DbProvider;
1313
use katana_provider::providers::fork::ForkedProvider;
1414
use katana_provider::traits::block::{BlockProvider, BlockWriter};
15-
use katana_provider::traits::contract::ContractClassWriter;
15+
use katana_provider::traits::contract::{ContractClassWriter, ContractClassWriterExt};
1616
use katana_provider::traits::env::BlockEnvProvider;
1717
use katana_provider::traits::stage::StageCheckpointProvider;
1818
use katana_provider::traits::state::{StateFactoryProvider, StateRootProvider, StateWriter};
@@ -43,6 +43,7 @@ pub trait Database:
4343
+ StateRootProvider
4444
+ StateWriter
4545
+ ContractClassWriter
46+
+ ContractClassWriterExt
4647
+ StateFactoryProvider
4748
+ BlockEnvProvider
4849
+ ClassTrieWriter
@@ -67,6 +68,7 @@ impl<T> Database for T where
6768
+ StateRootProvider
6869
+ StateWriter
6970
+ ContractClassWriter
71+
+ ContractClassWriterExt
7072
+ StateFactoryProvider
7173
+ BlockEnvProvider
7274
+ ClassTrieWriter

crates/katana/pipeline/Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,16 @@ katana-executor.workspace = true
1111
katana-pool.workspace = true
1212
katana-primitives.workspace = true
1313
katana-provider = { workspace = true, features = [ "test-utils" ] }
14+
katana-rpc-types.workspace = true
1415
katana-tasks.workspace = true
1516

1617
anyhow.workspace = true
1718
async-trait.workspace = true
19+
backon = { version = "1.3.0", features = [ "tokio-sleep" ] }
1820
futures.workspace = true
21+
parking_lot = "0.12.3"
22+
serde_json = "1.0.133"
1923
starknet.workspace = true
2024
thiserror.workspace = true
2125
tokio.workspace = true
2226
tracing.workspace = true
23-
serde_json = "1.0.133"
24-
parking_lot = "0.12.3"

crates/katana/pipeline/src/stage/blocks.rs

Lines changed: 72 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
use std::time::Duration;
1+
use std::sync::Arc;
22

3+
use backon::{ExponentialBuilder, Retryable};
34
use katana_primitives::block::{BlockNumber, SealedBlockWithStatus};
45
use katana_primitives::state::{StateUpdates, StateUpdatesWithClasses};
56
use katana_provider::traits::block::BlockWriter;
67
use starknet::providers::sequencer::models::{BlockId, StateUpdateWithBlock};
78
use starknet::providers::{ProviderError, SequencerGatewayProvider};
9+
use tracing::warn;
810

911
use super::{Stage, StageExecutionInput, StageResult};
1012

@@ -17,20 +19,12 @@ pub enum Error {
1719
#[derive(Debug)]
1820
pub struct Blocks<P> {
1921
provider: P,
20-
feeder_gateway: SequencerGatewayProvider,
22+
downloader: Downloader,
2123
}
2224

2325
impl<P> Blocks<P> {
2426
pub fn new(provider: P, feeder_gateway: SequencerGatewayProvider) -> Self {
25-
Self { provider, feeder_gateway }
26-
}
27-
}
28-
29-
impl<P: BlockWriter> Blocks<P> {
30-
#[allow(deprecated)]
31-
async fn fetch_block(&self, block: BlockNumber) -> Result<StateUpdateWithBlock, Error> {
32-
let res = self.feeder_gateway.get_state_update_with_block(BlockId::Number(block)).await?;
33-
Ok(res)
27+
Self { provider, downloader: Downloader::new(feeder_gateway) }
3428
}
3529
}
3630

@@ -41,10 +35,11 @@ impl<P: BlockWriter> Stage for Blocks<P> {
4135
}
4236

4337
async fn execute(&mut self, input: &StageExecutionInput) -> StageResult {
44-
let mut current_block = input.from;
38+
// Download all blocks concurrently
39+
let blocks = self.downloader.fetch_blocks_range(input.from, input.to, 10).await?;
4540

46-
loop {
47-
let data = self.fetch_block(current_block).await?;
41+
// Then process them sequentially
42+
for data in blocks {
4843
let StateUpdateWithBlock { state_update, block: fgw_block } = data;
4944

5045
let block = SealedBlockWithStatus::from(fgw_block);
@@ -57,17 +52,74 @@ impl<P: BlockWriter> Stage for Blocks<P> {
5752
Vec::new(),
5853
Vec::new(),
5954
);
55+
}
6056

61-
tokio::time::sleep(Duration::from_secs(1)).await;
57+
Ok(())
58+
}
59+
}
60+
61+
#[derive(Debug, Clone)]
62+
struct Downloader {
63+
client: Arc<SequencerGatewayProvider>,
64+
}
6265

63-
if current_block == input.to {
64-
break;
65-
} else {
66-
current_block += 1;
66+
impl Downloader {
67+
fn new(client: SequencerGatewayProvider) -> Self {
68+
Self { client: Arc::new(client) }
69+
}
70+
71+
/// Fetch blocks in the range [from, to] in batches of `batch_size`.
72+
async fn fetch_blocks_range(
73+
&self,
74+
from: BlockNumber,
75+
to: BlockNumber,
76+
batch_size: usize,
77+
) -> Result<Vec<StateUpdateWithBlock>, Error> {
78+
let mut all_results = Vec::with_capacity(to.saturating_sub(from) as usize);
79+
80+
for batch_start in (from..=to).step_by(batch_size) {
81+
let batch_end = (batch_start + batch_size as u64 - 1).min(to);
82+
83+
// fetch in batches and wait on them before proceeding to the next batch
84+
let mut futures = Vec::new();
85+
for block_num in batch_start..=batch_end {
86+
futures.push(self.fetch_block_with_retry(block_num));
6787
}
88+
89+
let batch_results = futures::future::join_all(futures).await;
90+
all_results.extend(batch_results);
6891
}
6992

70-
Ok(())
93+
all_results.into_iter().collect()
94+
}
95+
96+
/// Fetch a single block with the given block number with retry mechanism.
97+
async fn fetch_block_with_retry(
98+
&self,
99+
block: BlockNumber,
100+
) -> Result<StateUpdateWithBlock, Error> {
101+
let request = || async move {
102+
#[allow(deprecated)]
103+
self.clone().fetch_block(block).await
104+
};
105+
106+
// Retry only when being rate limited
107+
let result = request
108+
.retry(ExponentialBuilder::default())
109+
.when(|e| matches!(e, Error::Gateway(ProviderError::RateLimited)))
110+
.notify(|error, _| {
111+
warn!(target: "pipeline", %block, %error, "Retrying block download.");
112+
})
113+
.await?;
114+
115+
Ok(result)
116+
}
117+
118+
/// Fetch a single block with the given block number.
119+
async fn fetch_block(&self, block: BlockNumber) -> Result<StateUpdateWithBlock, Error> {
120+
#[allow(deprecated)]
121+
let res = self.client.get_state_update_with_block(BlockId::Number(block)).await?;
122+
Ok(res)
71123
}
72124
}
73125

crates/katana/pipeline/src/stage/classes.rs

Lines changed: 20 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ use std::time::Duration;
22

33
use anyhow::Result;
44
use katana_primitives::block::BlockNumber;
5-
use katana_primitives::class::{CasmContractClass, ClassHash, CompiledClass, ContractClass};
5+
use katana_primitives::class::{
6+
CasmContractClass, ClassHash, CompiledClass, ContractClass, SierraContractClass,
7+
};
68
use katana_primitives::conversion::rpc::{legacy_rpc_to_class, StarknetRsLegacyContractClass};
7-
use katana_provider::traits::contract::ContractClassWriter;
9+
use katana_provider::traits::contract::{ContractClassWriter, ContractClassWriterExt};
810
use katana_provider::traits::state_update::StateUpdateProvider;
11+
use katana_rpc_types::class::RpcSierraContractClass;
912
use starknet::providers::sequencer::models::{BlockId, DeployedClass};
1013
use starknet::providers::{ProviderError, SequencerGatewayProvider};
11-
use tracing::debug;
14+
use tracing::info;
1215

1316
use super::{Stage, StageExecutionInput, StageResult};
1417

@@ -35,44 +38,31 @@ where
3538
P: StateUpdateProvider + ContractClassWriter,
3639
{
3740
#[allow(deprecated)]
38-
async fn get_class(
39-
&self,
40-
hash: ClassHash,
41-
block: BlockNumber,
42-
) -> Result<(ContractClass, Option<CompiledClass>), Error> {
43-
let block_id = BlockId::Number(block);
41+
async fn get_class(&self, hash: ClassHash, block: BlockNumber) -> Result<ContractClass, Error> {
42+
let class = self.feeder_gateway.get_class_by_hash(hash, BlockId::Number(block)).await?;
4443

45-
let (class, casm) = tokio::join!(
46-
self.feeder_gateway.get_class_by_hash(hash, block_id),
47-
self.feeder_gateway.get_compiled_class_by_class_hash(hash, block_id)
48-
);
49-
50-
let (class, casm) = (class?, casm?);
51-
52-
let (class, casm) = match class {
44+
let class = match class {
5345
DeployedClass::LegacyClass(legacy) => {
5446
let class = to_inner_legacy_class(legacy).unwrap();
55-
(class, None)
47+
class
5648
}
5749

50+
// TODO: implement our own fgw client using our own types for easier conversion
5851
DeployedClass::SierraClass(sierra) => {
59-
// TODO: change this shyte
60-
let value = serde_json::to_value(casm).unwrap();
61-
let casm = serde_json::from_value::<CasmContractClass>(value).unwrap();
62-
let compiled = CompiledClass::Class(casm);
63-
64-
(ContractClass::Class(sierra), Some(compiled))
52+
let rpc_class = RpcSierraContractClass::try_from(sierra).unwrap();
53+
let class = SierraContractClass::try_from(rpc_class).unwrap();
54+
ContractClass::Class(class)
6555
}
6656
};
6757

68-
Ok((class, casm))
58+
Ok(class)
6959
}
7060
}
7161

7262
#[async_trait::async_trait]
7363
impl<P> Stage for Classes<P>
7464
where
75-
P: StateUpdateProvider + ContractClassWriter,
65+
P: StateUpdateProvider + ContractClassWriter + ContractClassWriterExt,
7666
{
7767
fn id(&self) -> &'static str {
7868
"Classes"
@@ -84,16 +74,12 @@ where
8474
let class_hashes = self.provider.declared_classes(i.into())?.unwrap();
8575

8676
// TODO: do this in parallel
87-
for class_hash in class_hashes.keys() {
88-
debug!(target: "pipeline", "Fetching class artifacts for class hash {class_hash:#x}");
77+
for hash in class_hashes.keys() {
78+
info!(target: "pipeline", class_hash = format!("{hash:#x}"), "Fetching class artifacts.");
8979

9080
// 1. fetch sierra and casm class from fgw
91-
let (class, compiled) = self.get_class(*class_hash, i).await?;
92-
93-
self.provider.set_class(*class_hash, class)?;
94-
if let Some(casm) = compiled {
95-
self.provider.set_compiled_class(*class_hash, casm)?;
96-
}
81+
let class = self.get_class(*hash, i).await?;
82+
self.provider.set_class(*hash, class)?;
9783

9884
tokio::time::sleep(Duration::from_secs(1)).await;
9985
}

crates/katana/primitives/src/class.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,8 @@ fn compute_legacy_class_hash(class: &LegacyContractClass) -> Result<Felt, Comput
163163
#[cfg(test)]
164164
mod tests {
165165

166-
use starknet::core::types::contract::SierraClass as StarknetRsSierraContractClass;
167166
use starknet::core::types::contract::legacy::LegacyContractClass as StarknetRsLegacyContractClass;
167+
use starknet::core::types::contract::SierraClass as StarknetRsSierraContractClass;
168168

169169
use super::{ContractClass, LegacyContractClass, SierraContractClass};
170170

crates/katana/storage/db/src/codecs/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
#[cfg(feature = "postcard")]
22
pub mod postcard;
33

4-
use katana_primitives::Felt;
54
use katana_primitives::block::FinalityStatus;
65
use katana_primitives::class::ContractClass;
76
use katana_primitives::contract::ContractAddress;
7+
use katana_primitives::Felt;
88

99
use crate::error::CodecError;
1010

0 commit comments

Comments
 (0)