Skip to content

Commit 2d96dd5

Browse files
committed
fix pipeline loop
1 parent 03924c4 commit 2d96dd5

File tree

4 files changed

+9
-28
lines changed

4 files changed

+9
-28
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/katana/node/src/lib.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use katana_db::mdbx::DbEnv;
3333
use katana_executor::implementation::blockifier::BlockifierFactory;
3434
use katana_executor::{ExecutionFlags, ExecutorFactory};
3535
use katana_pipeline::stage::{Blocks, Classes};
36-
use katana_pipeline::{stage, Pipeline};
36+
use katana_pipeline::{stage, Pipeline, PipelineHandle};
3737
use katana_pool::ordering::FiFo;
3838
use katana_pool::validation::stateful::TxValidator;
3939
use katana_pool::TxPool;
@@ -62,6 +62,8 @@ pub struct LaunchedNode {
6262
pub node: Node,
6363
/// Handle to the rpc server.
6464
pub rpc: RpcServer,
65+
/// Handle to the syncing pipeline.
66+
pub pipeline: PipelineHandle,
6567
}
6668

6769
impl LaunchedNode {
@@ -128,22 +130,11 @@ impl Node {
128130
let block_producer = self.block_producer.clone();
129131
let validator = self.block_producer.validator().clone();
130132

131-
// // --- build sequencing stage
132-
133-
// let sequencing = stage::Sequencing::new(
134-
// pool.clone(),
135-
// backend.clone(),
136-
// self.task_manager.task_spawner(),
137-
// block_producer.clone(),
138-
// self.messaging_config.clone(),
139-
// );
140-
141133
// --- build and start the pipeline
142134

143135
let provider = self.backend.blockchain.provider().clone();
144136
let fgw = SequencerGatewayProvider::starknet_alpha_sepolia();
145137
let (mut pipeline, handle) = Pipeline::new(provider.clone(), 10);
146-
// pipeline.add_stage(Box::new(sequencing));
147138

148139
pipeline.add_stage(Blocks::new(provider.clone(), fgw.clone()));
149140
pipeline.add_stage(Classes::new(provider, fgw.clone()));
@@ -160,7 +151,7 @@ impl Node {
160151
let node_components = (pool, backend, block_producer, validator, self.forked_client.take());
161152
let rpc = spawn(node_components, self.rpc_config.clone()).await?;
162153

163-
Ok(LaunchedNode { node: self, rpc })
154+
Ok(LaunchedNode { node: self, pipeline: handle, rpc })
164155
}
165156
}
166157

crates/katana/pipeline/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ anyhow.workspace = true
1818
async-trait.workspace = true
1919
backon = { version = "1.3.0", features = [ "tokio-sleep" ] }
2020
futures.workspace = true
21-
parking_lot = "0.12.3"
2221
serde_json = "1.0.133"
2322
starknet.workspace = true
2423
thiserror.workspace = true

crates/katana/pipeline/src/lib.rs

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,11 @@
33
pub mod stage;
44

55
use core::future::IntoFuture;
6-
use std::sync::Arc;
76

87
use futures::future::BoxFuture;
98
use katana_primitives::block::BlockNumber;
109
use katana_provider::error::ProviderError;
1110
use katana_provider::traits::stage::StageCheckpointProvider;
12-
use parking_lot::{Condvar, Mutex};
1311
use stage::{Stage, StageExecutionInput};
1412
use tokio::sync::watch;
1513
use tracing::{error, info};
@@ -32,12 +30,6 @@ pub enum Error {
3230
Provider(#[from] ProviderError),
3331
}
3432

35-
#[derive(Debug)]
36-
pub enum PipelineEvents {
37-
UpdateTip(BlockNumber),
38-
Stop,
39-
}
40-
4133
#[derive(Debug)]
4234
pub struct PipelineHandle {
4335
tx: watch::Sender<Option<BlockNumber>>,
@@ -95,15 +87,15 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
9587
let tip = *self.tip.borrow_and_update();
9688

9789
loop {
98-
if let Some(to) = tip {
99-
let to = current_chunk_tip.min(to);
90+
if let Some(tip) = tip {
91+
let to = current_chunk_tip.min(tip);
10092
self.run_once_until(to).await?;
10193

102-
if current_chunk_tip >= to {
94+
if to >= tip {
95+
info!(target: "pipeline", %tip, "Finished processing until tip.");
10396
break;
10497
} else {
105-
current_chunk_tip = (current_chunk_tip + self.chunk_size).min(to);
106-
continue;
98+
current_chunk_tip = (current_chunk_tip + self.chunk_size).min(tip);
10799
}
108100
}
109101
}

0 commit comments

Comments
 (0)