Skip to content

Commit 2db8472

Browse files
committed
update
1 parent 2d96dd5 commit 2db8472

File tree

6 files changed

+130
-91
lines changed

6 files changed

+130
-91
lines changed

crates/katana/cli/src/args.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use serde::{Deserialize, Serialize};
2222
use tracing::{info, Subscriber};
2323
use tracing_log::LogTracer;
2424
use tracing_subscriber::{fmt, EnvFilter};
25-
use url::Url;
2625

2726
use crate::file::NodeArgsConfig;
2827
use crate::options::*;
@@ -136,8 +135,8 @@ impl NodeArgs {
136135

137136
fn init_logging(&self) -> Result<()> {
138137
const DEFAULT_LOG_FILTER: &str =
139-
"pipeline=debug,info,tasks=debug,executor=trace,forking::backend=trace,blockifier=off,\
140-
jsonrpsee_server=off,hyper=off,messaging=debug,node=error";
138+
"pipeline=debug,stage=debug,info,tasks=debug,executor=trace,forking::backend=trace,\
139+
blockifier=off,jsonrpsee_server=off,hyper=off,messaging=debug,node=error";
141140

142141
let filter = if self.development.dev {
143142
&format!("{DEFAULT_LOG_FILTER},server=debug")

crates/katana/node/src/lib.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use katana_db::mdbx::DbEnv;
3333
use katana_executor::implementation::blockifier::BlockifierFactory;
3434
use katana_executor::{ExecutionFlags, ExecutorFactory};
3535
use katana_pipeline::stage::{Blocks, Classes};
36-
use katana_pipeline::{stage, Pipeline, PipelineHandle};
36+
use katana_pipeline::{Pipeline, PipelineHandle};
3737
use katana_pool::ordering::FiFo;
3838
use katana_pool::validation::stateful::TxValidator;
3939
use katana_pool::TxPool;
@@ -134,12 +134,12 @@ impl Node {
134134

135135
let provider = self.backend.blockchain.provider().clone();
136136
let fgw = SequencerGatewayProvider::starknet_alpha_sepolia();
137-
let (mut pipeline, handle) = Pipeline::new(provider.clone(), 10);
137+
let (mut pipeline, handle) = Pipeline::new(provider.clone(), 64);
138138

139-
pipeline.add_stage(Blocks::new(provider.clone(), fgw.clone()));
140-
pipeline.add_stage(Classes::new(provider, fgw.clone()));
139+
pipeline.add_stage(Blocks::new(provider.clone(), fgw.clone(), 3));
140+
pipeline.add_stage(Classes::new(provider, fgw.clone(), 3));
141141

142-
handle.set_tip(100);
142+
handle.set_tip(1000);
143143

144144
self.task_manager
145145
.task_spawner()

crates/katana/pipeline/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,9 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
132132
let input = StageExecutionInput { from: checkpoint + 1, to };
133133
stage.execute(&input).await?;
134134
self.provider.set_checkpoint(id, to)?;
135-
}
136135

136+
info!(target: "pipeline", %id, from = %checkpoint, %to, "Stage execution completed.");
137+
}
137138
Ok(())
138139
}
139140
}
@@ -193,7 +194,7 @@ mod tests {
193194
async fn stage_checkpoint() {
194195
let provider = test_provider();
195196

196-
let (mut pipeline, handle) = Pipeline::new(&provider, 10);
197+
let (mut pipeline, _handle) = Pipeline::new(&provider, 10);
197198
pipeline.add_stage(MockStage);
198199

199200
// check that the checkpoint was set

crates/katana/pipeline/src/stage/blocks.rs

Lines changed: 65 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use backon::{ExponentialBuilder, Retryable};
45
use katana_primitives::block::{BlockNumber, SealedBlockWithStatus};
56
use katana_primitives::state::{StateUpdates, StateUpdatesWithClasses};
67
use katana_provider::traits::block::BlockWriter;
78
use starknet::providers::sequencer::models::{BlockId, StateUpdateWithBlock};
89
use starknet::providers::{ProviderError, SequencerGatewayProvider};
9-
use tracing::warn;
10+
use tracing::{debug, warn};
1011

1112
use super::{Stage, StageExecutionInput, StageResult};
1213

@@ -23,8 +24,13 @@ pub struct Blocks<P> {
2324
}
2425

2526
impl<P> Blocks<P> {
26-
pub fn new(provider: P, feeder_gateway: SequencerGatewayProvider) -> Self {
27-
Self { provider, downloader: Downloader::new(feeder_gateway) }
27+
pub fn new(
28+
provider: P,
29+
feeder_gateway: SequencerGatewayProvider,
30+
download_batch_size: usize,
31+
) -> Self {
32+
let downloader = Downloader::new(feeder_gateway, download_batch_size);
33+
Self { provider, downloader }
2834
}
2935
}
3036

@@ -36,22 +42,25 @@ impl<P: BlockWriter> Stage for Blocks<P> {
3642

3743
async fn execute(&mut self, input: &StageExecutionInput) -> StageResult {
3844
// Download all blocks concurrently
39-
let blocks = self.downloader.fetch_blocks_range(input.from, input.to, 10).await?;
40-
41-
// Then process them sequentially
42-
for data in blocks {
43-
let StateUpdateWithBlock { state_update, block: fgw_block } = data;
44-
45-
let block = SealedBlockWithStatus::from(fgw_block);
46-
let su = StateUpdates::from(state_update);
47-
let su = StateUpdatesWithClasses { state_updates: su, ..Default::default() };
48-
49-
let _ = self.provider.insert_block_with_states_and_receipts(
50-
block,
51-
su,
52-
Vec::new(),
53-
Vec::new(),
54-
);
45+
let blocks = self.downloader.download_blocks(input.from, input.to).await?;
46+
47+
if !blocks.is_empty() {
48+
debug!(target: "stage", id = %self.id(), total = %blocks.len(), "Storing blocks to storage.");
49+
// Store blocks to storage
50+
for block in blocks {
51+
let StateUpdateWithBlock { state_update, block: fgw_block } = block;
52+
53+
let block = SealedBlockWithStatus::from(fgw_block);
54+
let su = StateUpdates::from(state_update);
55+
let su = StateUpdatesWithClasses { state_updates: su, ..Default::default() };
56+
57+
let _ = self.provider.insert_block_with_states_and_receipts(
58+
block,
59+
su,
60+
Vec::new(),
61+
Vec::new(),
62+
);
63+
}
5564
}
5665

5766
Ok(())
@@ -60,61 +69,69 @@ impl<P: BlockWriter> Stage for Blocks<P> {
6069

6170
#[derive(Debug, Clone)]
6271
struct Downloader {
72+
batch_size: usize,
6373
client: Arc<SequencerGatewayProvider>,
6474
}
6575

6676
impl Downloader {
67-
fn new(client: SequencerGatewayProvider) -> Self {
68-
Self { client: Arc::new(client) }
77+
fn new(client: SequencerGatewayProvider, batch_size: usize) -> Self {
78+
Self { client: Arc::new(client), batch_size }
6979
}
7080

7181
/// Fetch blocks in the range [from, to] in batches of `batch_size`.
72-
async fn fetch_blocks_range(
82+
async fn download_blocks(
7383
&self,
7484
from: BlockNumber,
7585
to: BlockNumber,
76-
batch_size: usize,
7786
) -> Result<Vec<StateUpdateWithBlock>, Error> {
78-
let mut all_results = Vec::with_capacity(to.saturating_sub(from) as usize);
79-
80-
for batch_start in (from..=to).step_by(batch_size) {
81-
let batch_end = (batch_start + batch_size as u64 - 1).min(to);
82-
83-
// fetch in batches and wait on them before proceeding to the next batch
84-
let mut futures = Vec::new();
85-
for block_num in batch_start..=batch_end {
86-
futures.push(self.fetch_block_with_retry(block_num));
87-
}
87+
debug!(target: "pipeline", %from, %to, "Downloading blocks.");
88+
let mut blocks = Vec::with_capacity(to.saturating_sub(from) as usize);
8889

89-
let batch_results = futures::future::join_all(futures).await;
90-
all_results.extend(batch_results);
90+
for batch_start in (from..=to).step_by(self.batch_size) {
91+
let batch_end = (batch_start + self.batch_size as u64 - 1).min(to);
92+
let batch = self.fetch_blocks_with_retry(batch_start, batch_end).await?;
93+
blocks.extend(batch);
9194
}
9295

93-
all_results.into_iter().collect()
96+
Ok(blocks)
9497
}
9598

96-
/// Fetch a single block with the given block number with retry mechanism.
97-
async fn fetch_block_with_retry(
99+
/// Fetch blocks with the given block number with retry mechanism at a batch level.
100+
async fn fetch_blocks_with_retry(
98101
&self,
99-
block: BlockNumber,
100-
) -> Result<StateUpdateWithBlock, Error> {
101-
let request = || async move {
102-
#[allow(deprecated)]
103-
self.clone().fetch_block(block).await
104-
};
102+
from: BlockNumber,
103+
to: BlockNumber,
104+
) -> Result<Vec<StateUpdateWithBlock>, Error> {
105+
let request = || async move { self.clone().fetch_blocks(from, to).await };
105106

106107
// Retry only when being rate limited
108+
let backoff = ExponentialBuilder::default().with_min_delay(Duration::from_secs(9));
107109
let result = request
108-
.retry(ExponentialBuilder::default())
109-
.when(|e| matches!(e, Error::Gateway(ProviderError::RateLimited)))
110+
.retry(backoff)
110111
.notify(|error, _| {
111-
warn!(target: "pipeline", %block, %error, "Retrying block download.");
112+
warn!(target: "pipeline", %from, %to, %error, "Retrying block download.");
112113
})
113114
.await?;
114115

115116
Ok(result)
116117
}
117118

119+
async fn fetch_blocks(
120+
&self,
121+
from: BlockNumber,
122+
to: BlockNumber,
123+
) -> Result<Vec<StateUpdateWithBlock>, Error> {
124+
let total = to.saturating_sub(from) as usize;
125+
let mut requests = Vec::with_capacity(total);
126+
127+
for i in from..=to {
128+
requests.push(self.fetch_block(i));
129+
}
130+
131+
let results = futures::future::join_all(requests).await;
132+
results.into_iter().collect()
133+
}
134+
118135
/// Fetch a single block with the given block number.
119136
async fn fetch_block(&self, block: BlockNumber) -> Result<StateUpdateWithBlock, Error> {
120137
#[allow(deprecated)]
@@ -140,7 +157,7 @@ mod tests {
140157
let provider = test_provider();
141158
let feeder_gateway = SequencerGatewayProvider::starknet_alpha_sepolia();
142159

143-
let mut stage = Blocks::new(&provider, feeder_gateway);
160+
let mut stage = Blocks::new(&provider, feeder_gateway, 10);
144161

145162
let input = StageExecutionInput { from: from_block, to: to_block };
146163
let _ = stage.execute(&input).await.expect("failed to execute stage");

crates/katana/pipeline/src/stage/classes.rs

Lines changed: 53 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use anyhow::Result;
45
use backon::{ExponentialBuilder, Retryable};
@@ -10,7 +11,7 @@ use katana_provider::traits::state_update::StateUpdateProvider;
1011
use katana_rpc_types::class::RpcSierraContractClass;
1112
use starknet::providers::sequencer::models::{BlockId, DeployedClass};
1213
use starknet::providers::{ProviderError, SequencerGatewayProvider};
13-
use tracing::warn;
14+
use tracing::{debug, warn};
1415

1516
use super::{Stage, StageExecutionInput, StageResult};
1617

@@ -27,8 +28,13 @@ pub struct Classes<P> {
2728
}
2829

2930
impl<P> Classes<P> {
30-
pub fn new(provider: P, feeder_gateway: SequencerGatewayProvider) -> Self {
31-
Self { provider, downloader: Downloader::new(feeder_gateway) }
31+
pub fn new(
32+
provider: P,
33+
feeder_gateway: SequencerGatewayProvider,
34+
download_batch_size: usize,
35+
) -> Self {
36+
let downloader = Downloader::new(feeder_gateway, download_batch_size);
37+
Self { provider, downloader }
3238
}
3339
}
3440

@@ -43,12 +49,18 @@ where
4349

4450
async fn execute(&mut self, input: &StageExecutionInput) -> StageResult {
4551
for i in input.from..=input.to {
52+
// get the classes declared at block `i`
4653
let class_hashes = self.provider.declared_classes(i.into())?.unwrap();
4754
let class_hashes = class_hashes.keys().map(|hash| *hash).collect::<Vec<_>>();
4855

49-
let classes = self.downloader.fetch_classes(&class_hashes, i).await?;
50-
for (hash, class) in classes {
51-
self.provider.set_class(hash, class)?;
56+
// fetch the classes artifacts
57+
let classes = self.downloader.download_classes(&class_hashes, i).await?;
58+
59+
if !classes.is_empty() {
60+
debug!(target: "stage", id = %self.id(), total = %classes.len(), "Storing classes to storage.");
61+
for (hash, class) in class_hashes.iter().zip(classes) {
62+
self.provider.set_class(*hash, class)?;
63+
}
5264
}
5365
}
5466

@@ -58,53 +70,63 @@ where
5870

5971
#[derive(Debug, Clone)]
6072
struct Downloader {
73+
batch_size: usize,
6174
client: Arc<SequencerGatewayProvider>,
6275
}
6376

6477
impl Downloader {
65-
fn new(client: SequencerGatewayProvider) -> Self {
66-
Self { client: Arc::new(client) }
78+
fn new(client: SequencerGatewayProvider, batch_size: usize) -> Self {
79+
Self { client: Arc::new(client), batch_size }
6780
}
6881

69-
async fn fetch_classes(
82+
async fn download_classes(
7083
&self,
71-
classes: &[ClassHash],
84+
hashes: &[ClassHash],
7285
block: BlockNumber,
73-
) -> Result<Vec<(ClassHash, ContractClass)>, Error> {
74-
let mut all_results = Vec::with_capacity(classes.len());
75-
76-
for hash in classes {
77-
let mut futures = Vec::new();
78-
79-
futures.push(self.fetch_class_with_retry(*hash, block));
80-
let batch_results = futures::future::join_all(futures).await;
86+
) -> Result<Vec<ContractClass>, Error> {
87+
debug!(total = %hashes.len(), %block, "Downloading classes.");
88+
let mut classes = Vec::with_capacity(hashes.len());
8189

82-
all_results.extend(batch_results);
90+
for chunk in hashes.chunks(self.batch_size) {
91+
let batch = self.fetch_classes_with_retry(chunk, block).await?;
92+
classes.extend(batch);
8393
}
8494

85-
all_results.into_iter().collect()
95+
Ok(classes)
8696
}
8797

88-
async fn fetch_class_with_retry(
98+
async fn fetch_classes_with_retry(
8999
&self,
90-
hash: ClassHash,
100+
classes: &[ClassHash],
91101
block: BlockNumber,
92-
) -> Result<(ClassHash, ContractClass), Error> {
93-
let request = || async move {
94-
#[allow(deprecated)]
95-
self.clone().fetch_class(hash, block).await
96-
};
102+
) -> Result<Vec<ContractClass>, Error> {
103+
let request = || async move { self.clone().fetch_classes(classes, block).await };
97104

98105
// Retry only when being rate limited
106+
let backoff = ExponentialBuilder::default().with_min_delay(Duration::from_secs(3));
99107
let result = request
100-
.retry(ExponentialBuilder::default())
101-
.when(|e| matches!(e, Error::Gateway(ProviderError::RateLimited)))
108+
.retry(backoff)
102109
.notify(|error, _| {
103-
warn!(target: "pipeline", hash = format!("{hash:#x}"), %block, %error, "Retrying class download.");
110+
warn!(target: "pipeline", %error, "Retrying class download.");
104111
})
105112
.await?;
106113

107-
Ok((hash, result))
114+
Ok(result)
115+
}
116+
117+
async fn fetch_classes(
118+
&self,
119+
classes: &[ClassHash],
120+
block: BlockNumber,
121+
) -> Result<Vec<ContractClass>, Error> {
122+
let mut requests = Vec::with_capacity(classes.len());
123+
124+
for class in classes {
125+
requests.push(self.fetch_class(*class, block));
126+
}
127+
128+
let results = futures::future::join_all(requests).await;
129+
results.into_iter().collect()
108130
}
109131

110132
async fn fetch_class(

crates/katana/pipeline/tests/sync.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ async fn fgw_sync() {
1616
// build stages
1717

1818
let fgw = SequencerGatewayProvider::starknet_alpha_sepolia();
19-
let blocks = stage::Blocks::new(db_provider.clone(), fgw.clone());
20-
let classes = stage::Classes::new(db_provider.clone(), fgw);
19+
let blocks = stage::Blocks::new(db_provider.clone(), fgw.clone(), 10);
20+
let classes = stage::Classes::new(db_provider.clone(), fgw, 10);
2121

2222
let (mut pipeline, handle) = Pipeline::new(db_provider.clone(), chunk_size);
2323
pipeline.add_stage(blocks);

0 commit comments

Comments
 (0)