Skip to content

Commit 3f9c88b

Browse files
committed
node integration
1 parent 303da6c commit 3f9c88b

File tree

8 files changed

+48
-36
lines changed

8 files changed

+48
-36
lines changed

Cargo.lock

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/core/src/service/block_producer_tests.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use katana_gas_price_oracle::GasPriceOracle;
66
use katana_primitives::transaction::{ExecutableTx, InvokeTx};
77
use katana_primitives::Felt;
88
use katana_provider::providers::db::DbProvider;
9-
use katana_tasks::TaskManager;
109

1110
use super::*;
1211
use crate::backend::storage::Blockchain;

crates/gateway/gateway-server/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ version.workspace = true
77

88
[dependencies]
99
katana-gateway-types.workspace = true
10-
katana-executor.workspace = true
11-
katana-core.workspace = true
1210
katana-primitives.workspace = true
1311
katana-metrics.workspace = true
1412
katana-pool.workspace = true

crates/node/src/full/mod.rs

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
use std::future::IntoFuture;
44
use std::sync::Arc;
55

6+
use alloy_provider::RootProvider;
67
use anyhow::Result;
78
use http::header::CONTENT_TYPE;
89
use http::Method;
910
use jsonrpsee::RpcModule;
1011
use katana_chain_spec::ChainSpec;
1112
use katana_executor::ExecutionFlags;
13+
use katana_gas_price_oracle::GasPriceOracle;
1214
use katana_gateway_client::Client as SequencerGateway;
1315
use katana_metrics::exporters::prometheus::PrometheusRecorder;
1416
use katana_metrics::{Report, Server as MetricsServer};
@@ -27,6 +29,7 @@ use tracing::{error, info};
2729

2830
use crate::config::db::DbConfig;
2931
use crate::config::metrics::MetricsConfig;
32+
use crate::full::pending::PreconfStateFactory;
3033

3134
mod exit;
3235
mod pending;
@@ -75,6 +78,7 @@ pub struct Node {
7578
pub pipeline: Pipeline<DbProvider>,
7679
pub rpc_server: RpcServer,
7780
pub gateway_client: SequencerGateway,
81+
pub chain_tip_watcher: ChainTipWatcher<RootProvider>,
7882
}
7983

8084
impl Node {
@@ -114,12 +118,32 @@ impl Node {
114118

115119
// --- build pipeline
116120

117-
let (mut pipeline, _) = Pipeline::new(provider.clone(), 50);
121+
let (mut pipeline, pipeline_handle) = Pipeline::new(provider.clone(), 50);
118122
let block_downloader = BatchBlockDownloader::new_gateway(gateway_client.clone(), 8);
119123
pipeline.add_stage(Blocks::new(provider.clone(), block_downloader));
120124
pipeline.add_stage(Classes::new(provider.clone(), gateway_client.clone(), 8));
121125
pipeline.add_stage(StateTrie::new(provider.clone()));
122126

127+
// --
128+
129+
let core_contract = match config.network {
130+
Network::Mainnet => {
131+
katana_starknet::StarknetCore::new_http_mainnet(&config.eth_rpc_url)?
132+
}
133+
Network::Sepolia => {
134+
katana_starknet::StarknetCore::new_http_sepolia(&config.eth_rpc_url)?
135+
}
136+
};
137+
138+
let chain_tip_watcher = ChainTipWatcher::new(core_contract);
139+
140+
let preconf_factory = PreconfStateFactory::new(
141+
provider.clone(),
142+
gateway_client.clone(),
143+
pipeline_handle.subscribe_blocks(),
144+
chain_tip_watcher.subscribe(),
145+
);
146+
123147
// --- build rpc server
124148

125149
let mut rpc_modules = RpcModule::new(());
@@ -148,6 +172,8 @@ impl Node {
148172
pool.clone(),
149173
task_spawner.clone(),
150174
starknet_api_cfg,
175+
Box::new(preconf_factory),
176+
GasPriceOracle::create_for_testing(),
151177
);
152178

153179
if config.rpc.apis.contains(&RpcModuleKind::Starknet) {
@@ -194,6 +220,7 @@ impl Node {
194220
rpc_server,
195221
task_manager,
196222
gateway_client,
223+
chain_tip_watcher,
197224
config: Arc::new(config),
198225
})
199226
}
@@ -212,18 +239,7 @@ impl Node {
212239

213240
let pipeline_handle = self.pipeline.handle();
214241

215-
let core_contract = match self.config.network {
216-
Network::Mainnet => {
217-
katana_starknet::StarknetCore::new_http_mainnet(&self.config.eth_rpc_url).await?
218-
}
219-
Network::Sepolia => {
220-
katana_starknet::StarknetCore::new_http_sepolia(&self.config.eth_rpc_url).await?
221-
}
222-
};
223-
224-
let tip_watcher = ChainTipWatcher::new(core_contract);
225-
226-
let mut tip_subscription = tip_watcher.subscribe();
242+
let mut tip_subscription = self.chain_tip_watcher.subscribe();
227243
let pipeline_handle_clone = pipeline_handle.clone();
228244

229245
self.task_manager
@@ -237,7 +253,7 @@ impl Node {
237253
.build_task()
238254
.graceful_shutdown()
239255
.name("Chain tip watcher")
240-
.spawn(tip_watcher.into_future());
256+
.spawn(self.chain_tip_watcher.into_future());
241257

242258
// spawn a task for updating the pipeline's tip based on chain tip changes
243259
self.task_manager.task_spawner().spawn(async move {

crates/node/src/full/pending/mod.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ use std::time::Duration;
33

44
use katana_gateway::client::Client;
55
use katana_gateway::types::{ConfirmedTransaction, ErrorCode, PreConfirmedBlock, StateDiff};
6+
use katana_pipeline::PipelineBlockSubscription;
67
use katana_primitives::block::BlockNumber;
78
use katana_primitives::state::StateUpdates;
89
use katana_provider::api::state::StateFactoryProvider;
910
use parking_lot::Mutex;
10-
use tokio::sync::watch;
1111
use tracing::error;
1212

1313
use crate::full::pending::state::PreconfStateProvider;
@@ -19,7 +19,7 @@ pub mod state;
1919
#[derive(Debug)]
2020
pub struct PreconfStateFactory<P: StateFactoryProvider> {
2121
// from pipeline
22-
latest_synced_block: watch::Receiver<BlockNumber>,
22+
latest_synced_block: PipelineBlockSubscription,
2323
gateway_client: Client,
2424
provider: P,
2525

@@ -31,7 +31,7 @@ impl<P: StateFactoryProvider> PreconfStateFactory<P> {
3131
pub fn new(
3232
state_factory_provider: P,
3333
gateway_client: Client,
34-
latest_synced_block: watch::Receiver<BlockNumber>,
34+
latest_synced_block: PipelineBlockSubscription,
3535
tip_subscription: TipSubscription,
3636
) -> Self {
3737
let shared_preconf_block = SharedPreconfBlockData::default();
@@ -55,7 +55,7 @@ impl<P: StateFactoryProvider> PreconfStateFactory<P> {
5555
}
5656

5757
pub fn state(&self) -> PreconfStateProvider {
58-
let latest_block_num = *self.latest_synced_block.borrow();
58+
let latest_block_num = self.latest_synced_block.block().unwrap();
5959
let base = self.provider.historical(latest_block_num.into()).unwrap().unwrap();
6060

6161
let preconf_block = self.shared_preconf_block.inner.lock();
@@ -114,7 +114,7 @@ struct PreconfBlockWatcher {
114114
gateway_client: Client,
115115

116116
// from pipeline
117-
latest_synced_block: watch::Receiver<BlockNumber>,
117+
latest_synced_block: PipelineBlockSubscription,
118118
// from tip watcher (actual tip of the chain)
119119
latest_block: TipSubscription,
120120

@@ -124,7 +124,8 @@ struct PreconfBlockWatcher {
124124

125125
impl PreconfBlockWatcher {
126126
async fn run(&mut self) {
127-
let mut current_preconf_block_num = *self.latest_synced_block.borrow() + 1;
127+
let mut current_preconf_block_num =
128+
self.latest_synced_block.block().map(|b| b + 1).unwrap_or(0);
128129

129130
loop {
130131
if current_preconf_block_num >= self.latest_block.tip() {
@@ -180,7 +181,7 @@ impl PreconfBlockWatcher {
180181
break;
181182
}
182183

183-
let latest_synced_block_num = *self.latest_synced_block.borrow();
184+
let latest_synced_block_num = self.latest_synced_block.block().unwrap();
184185
current_preconf_block_num = latest_synced_block_num + 1;
185186
}
186187

crates/rpc/rpc/src/starknet/mod.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ use std::fmt::Debug;
44
use std::future::Future;
55
use std::sync::Arc;
66

7-
use katana_core::backend::Backend;
8-
use katana_executor::ExecutorFactory;
97
use katana_chain_spec::ChainSpec;
108
use katana_core::backend::storage::Database;
9+
use katana_core::backend::Backend;
1110
use katana_core::utils::get_current_timestamp;
11+
use katana_executor::ExecutorFactory;
1212
use katana_gas_price_oracle::GasPriceOracle;
1313
use katana_pool::TransactionPool;
1414
use katana_primitives::block::{BlockHashOrNumber, BlockIdOrTag, FinalityStatus, GasPrices};
@@ -53,7 +53,6 @@ use katana_rpc_types_builder::{BlockBuilder, ReceiptBuilder};
5353
use katana_tasks::{Result as TaskResult, TaskSpawner};
5454

5555
use crate::permit::Permits;
56-
use crate::starknet::pending::PendingBlockProvider;
5756
use crate::utils::events::{Cursor, EventBlockId};
5857
use crate::{utils, DEFAULT_ESTIMATE_FEE_MAX_CONCURRENT_REQUESTS};
5958

@@ -97,9 +96,8 @@ where
9796
{
9897
pool: Pool,
9998
chain_spec: Arc<ChainSpec>,
100-
pool: P,
99+
pool: Pool,
101100
gas_oracle: GasPriceOracle,
102-
preconf_provider: Box<dyn PendingBlockProvider>,
103101
storage: BlockchainProvider<Box<dyn Database>>,
104102
forked_client: Option<ForkedClient>,
105103
task_spawner: TaskSpawner,
@@ -154,7 +152,7 @@ where
154152
pool,
155153
None,
156154
task_spawner,
157-
config,
155+
config,
158156
pending_block_provider,
159157
gas_oracle,
160158
)

crates/starknet/src/lib.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
1515
use alloy_network::Ethereum;
1616
use alloy_primitives::Address;
17-
use alloy_provider::{Provider, RootProvider};
17+
use alloy_provider::Provider;
18+
pub use alloy_provider::RootProvider;
1819
use alloy_rpc_types_eth::{Filter, FilterBlockOption, FilterSet, Log, Topic};
1920
use alloy_sol_types::{sol, SolEvent};
2021
use anyhow::Result;
@@ -225,7 +226,7 @@ impl StarknetCore<RootProvider<Ethereum>> {
225226
///
226227
/// * `rpc_url` - The HTTP URL of the Ethereum RPC endpoint
227228
/// * `contract_address` - The address of the Starknet Core Contract
228-
pub async fn new_http(rpc_url: impl AsRef<str>, contract_address: Address) -> Result<Self> {
229+
pub fn new_http(rpc_url: impl AsRef<str>, contract_address: Address) -> Result<Self> {
229230
let provider = RootProvider::<Ethereum>::new_http(reqwest::Url::parse(rpc_url.as_ref())?);
230231
Ok(Self::new(provider, contract_address))
231232
}
@@ -236,7 +237,7 @@ impl StarknetCore<RootProvider<Ethereum>> {
236237
/// # Arguments
237238
///
238239
/// * `rpc_url` - The HTTP URL of the Ethereum RPC endpoint
239-
pub async fn new_http_mainnet(rpc_url: impl AsRef<str>) -> Result<Self> {
240+
pub fn new_http_mainnet(rpc_url: impl AsRef<str>) -> Result<Self> {
240241
let provider = RootProvider::<Ethereum>::new_http(reqwest::Url::parse(rpc_url.as_ref())?);
241242
Ok(Self::new_mainnet(provider))
242243
}
@@ -247,7 +248,7 @@ impl StarknetCore<RootProvider<Ethereum>> {
247248
/// # Arguments
248249
///
249250
/// * `rpc_url` - The HTTP URL of the Ethereum RPC endpoint
250-
pub async fn new_http_sepolia(rpc_url: impl AsRef<str>) -> Result<Self> {
251+
pub fn new_http_sepolia(rpc_url: impl AsRef<str>) -> Result<Self> {
251252
let provider = RootProvider::<Ethereum>::new_http(reqwest::Url::parse(rpc_url.as_ref())?);
252253
Ok(Self::new_sepolia(provider))
253254
}

crates/sync/pipeline/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ enum PipelineCommand {
117117
/// This subscription receives notifications whenever the pipeline completes processing
118118
/// a block through all stages. The block number represents the highest block that has
119119
/// been successfully processed by all pipeline stages for a given batch.
120+
#[derive(Clone)]
120121
pub struct PipelineBlockSubscription {
121122
rx: watch::Receiver<Option<BlockNumber>>,
122123
}

0 commit comments

Comments
 (0)