Skip to content

Commit 8caf596

Browse files
committed
wip
1 parent fe3b774 commit 8caf596

File tree

5 files changed

+37
-28
lines changed

5 files changed

+37
-28
lines changed

crates/node/src/full/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,11 @@ impl Node {
116116

117117
// --- build pipeline
118118

119-
let (mut pipeline, _) = Pipeline::new(provider.clone(), 100);
119+
let (mut pipeline, _) = Pipeline::new(provider.clone(), 10);
120120
let block_downloader = BatchBlockDownloader::new_gateway(gateway_client.clone(), 10);
121121
pipeline.add_stage(Blocks::new(provider.clone(), block_downloader));
122122
pipeline.add_stage(Classes::new(provider.clone(), gateway_client.clone(), 3));
123-
// pipeline.add_stage(StateTrie::new(provider.clone()));
123+
pipeline.add_stage(StateTrie::new(provider.clone()));
124124

125125
// --- build rpc server
126126

crates/sync/pipeline/src/lib.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,10 @@ pub type PipelineFut = BoxFuture<'static, PipelineResult<()>>;
8282

8383
#[derive(Debug, thiserror::Error)]
8484
pub enum Error {
85-
#[error("Stage not found: {id}")]
85+
#[error("stage not found: {id}")]
8686
StageNotFound { id: String },
8787

88-
#[error("Stage '{id}' execution failed: {error}")]
88+
#[error("stage {id} execution failed: {error}")]
8989
StageExecution { id: &'static str, error: katana_stage::Error },
9090

9191
#[error(transparent)]
@@ -135,7 +135,7 @@ impl PipelineHandle {
135135
/// Panics if the [`Pipeline`] has been dropped.
136136
pub fn stop(&self) {
137137
info!(target: "pipeline", "Signaling pipeline to stop");
138-
self.tx.send(Some(PipelineCommand::Stop)).expect("channel closed");
138+
let _ = self.tx.send(Some(PipelineCommand::Stop));
139139
}
140140

141141
/// Wait until the [`Pipeline`] has stopped.
@@ -254,12 +254,17 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
254254
}
255255
}
256256

257-
_ = self.run_loop() => { }
257+
result = self.run_loop() => {
258+
if let Err(error) = result {
259+
error!(target: "pipeline", %error, "Pipeline finished due to error.");
260+
break;
261+
}
262+
}
258263

259264
}
260265
}
261266

262-
info!(target: "pipeline", "Pipeline finished.");
267+
info!(target: "pipeline", "Pipeline shutting down.");
263268

264269
Ok(())
265270
}

crates/sync/stage/src/blocks/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,12 @@ where
104104
let blocks = self
105105
.downloader
106106
.download_blocks(input.from(), input.to())
107-
.instrument(info_span!(target: "stage", "blocks.download", to = %input.to(), from = %input.from()))
107+
.instrument(info_span!(target: "stage", "blocks.download", from = %input.from(), to = %input.to()))
108108
.await
109109
.map_err(Error::Gateway)?;
110110

111111
if !blocks.is_empty() {
112-
let span = info_span!(target: "stage", "blocks.insert", to = %input.to(), from = %input.from());
112+
let span = info_span!(target: "stage", "blocks.insert", from = %input.from(), to = %input.to());
113113
let _enter = span.enter();
114114

115115
// Validate chain invariant before storing

crates/sync/stage/src/classes.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ where
131131
let declared_class_hashes = self.get_declared_classes(input.from(), input.to())?;
132132

133133
if declared_class_hashes.is_empty() {
134-
debug!("No classes declared within the block range", from = %input.from(), to = %input.to());
134+
debug!(from = %input.from(), to = %input.to(), "No classes declared within the block range");
135135
} else {
136136
let total_classes = declared_classes.len();
137137

crates/sync/stage/src/trie.rs

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ use katana_primitives::Felt;
44
use katana_provider::api::block::HeaderProvider;
55
use katana_provider::api::state_update::StateUpdateProvider;
66
use katana_provider::api::trie::TrieWriter;
7+
use katana_rpc_types::class;
78
use starknet::macros::short_string;
89
use starknet_types_core::hash::{Poseidon, StarkHash};
9-
use tracing::{error, info_span, trace, trace_span};
10+
use tracing::{debug, error, info_span, trace, trace_span};
1011

1112
use crate::{Stage, StageExecutionInput, StageExecutionOutput, StageResult};
1213

@@ -41,7 +42,7 @@ where
4142
fn execute<'a>(&'a mut self, input: &'a StageExecutionInput) -> BoxFuture<'a, StageResult> {
4243
Box::pin(async move {
4344
for block_number in input.from()..=input.to() {
44-
let span = info_span!("compute_state_root", %block_number);
45+
let span = info_span!("state_trie.compute_state_root", %block_number);
4546
let _enter = span.enter();
4647

4748
let expected_state_root = self
@@ -55,20 +56,27 @@ where
5556
.state_update(block_number.into())?
5657
.ok_or(Error::MissingStateUpdate(block_number))?;
5758

58-
let class_trie_root = self
59-
.provider
60-
.trie_insert_declared_classes(block_number, &state_update.declared_classes)?;
61-
6259
let contract_trie_root =
6360
self.provider.trie_insert_contract_updates(block_number, &state_update)?;
6461

65-
// Compute the state root:
66-
// hash("STARKNET_STATE_V0", contract_trie_root, class_trie_root)
67-
let computed_state_root = Poseidon::hash_array(&[
68-
short_string!("STARKNET_STATE_V0"),
69-
contract_trie_root,
70-
class_trie_root,
71-
]);
62+
let computed_state_root = if dbg!(!state_update.declared_classes.is_empty()) {
63+
let class_trie_root = self.provider.trie_insert_declared_classes(
64+
block_number,
65+
dbg!(&state_update.declared_classes),
66+
)?;
67+
68+
// Compute the state root:
69+
// hash("STARKNET_STATE_V0", contract_trie_root, class_trie_root)
70+
let computed_state_root = Poseidon::hash_array(&[
71+
short_string!("STARKNET_STATE_V0"),
72+
dbg!(contract_trie_root),
73+
dbg!(class_trie_root),
74+
]);
75+
76+
computed_state_root
77+
} else {
78+
contract_trie_root
79+
};
7280

7381
// Verify that the computed state root matches the expected state root from the
7482
// block header
@@ -88,11 +96,7 @@ where
8896
.into());
8997
}
9098

91-
trace!(
92-
block = %block_number,
93-
state_root = %format!("{computed_state_root:#x}"),
94-
"State root verified successfully."
95-
);
99+
debug!(block = %block_number, "State root verified successfully.");
96100
}
97101

98102
Ok(StageExecutionOutput { last_block_processed: input.to() })

0 commit comments

Comments
 (0)