Skip to content

Commit 7694823

Browse files
committed
refactor(sharding): schedule shards by id via pool
1 parent b4a350f commit 7694823

File tree

18 files changed

+195
-86
lines changed

18 files changed

+195
-86
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/node/sharding/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ katana-metrics.workspace = true
2020
katana-pool.workspace = true
2121
katana-primitives.workspace = true
2222
katana-provider.workspace = true
23-
katana-rpc-server.workspace = true
23+
katana-rpc-server = { workspace = true, features = [ "cartridge" ] }
2424
katana-rpc-api.workspace = true
2525
katana-rpc-client.workspace = true
2626
katana-rpc-types.workspace = true

crates/node/sharding/src/lib.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use katana_executor::blockifier::cache::ClassCache;
1111
use katana_executor::blockifier::BlockifierFactory;
1212
use katana_executor::{ExecutionFlags, ExecutorFactory};
1313
use katana_gas_price_oracle::GasPriceOracle;
14-
use katana_pool::TxPool;
1514
use katana_primitives::env::VersionedConstantsOverrides;
1615
use katana_primitives::{ContractAddress, Felt};
1716
use katana_provider::DbProviderFactory;
@@ -20,6 +19,7 @@ use katana_rpc_server::shard::{ShardProvider, ShardRpc};
2019
use katana_rpc_server::starknet::{StarknetApi, StarknetApiConfig};
2120
use katana_rpc_server::{RpcServer, RpcServerHandle};
2221
use katana_sharding::manager::{LazyShardManager, ShardManager};
22+
use katana_sharding::pool::ShardPool;
2323
use katana_sharding::runtime::{Runtime, RuntimeHandle};
2424
use katana_sharding::shard::NoPendingBlockProvider;
2525
use katana_tasks::TaskManager;
@@ -103,13 +103,13 @@ impl Node {
103103
gas_oracle,
104104
starknet_api_config,
105105
task_spawner.clone(),
106+
handle.scheduler().clone(),
106107
config.base_chain_url.clone(),
107108
));
108109

109110
// --- Build RPC server with shard API
110111
let provider = NodeShardProvider {
111112
manager: manager.clone(),
112-
handle: handle.clone(),
113113
chain_spec: config.chain.clone(),
114114
};
115115

@@ -184,12 +184,11 @@ impl LaunchedShardNode {
184184
/// manager and scheduler to the generic `ShardRpc` RPC handler.
185185
struct NodeShardProvider {
186186
manager: Arc<dyn ShardManager>,
187-
handle: RuntimeHandle,
188187
chain_spec: Arc<ChainSpec>,
189188
}
190189

191190
impl ShardProvider for NodeShardProvider {
192-
type Api = StarknetApi<TxPool, NoPendingBlockProvider, DbProviderFactory>;
191+
type Api = StarknetApi<ShardPool, NoPendingBlockProvider, DbProviderFactory>;
193192

194193
fn starknet_api(&self, shard_id: ContractAddress) -> Result<Self::Api, ErrorObjectOwned> {
195194
let shard = self.manager.get(shard_id).map_err(|e| {
@@ -202,12 +201,6 @@ impl ShardProvider for NodeShardProvider {
202201
Ok(shard.starknet_api.clone())
203202
}
204203

205-
fn schedule(&self, shard_id: ContractAddress) {
206-
if let Ok(shard) = self.manager.get(shard_id) {
207-
self.handle.schedule(shard);
208-
}
209-
}
210-
211204
fn shard_ids(&self) -> Vec<ContractAddress> {
212205
self.manager.shard_ids()
213206
}

crates/rpc/rpc-server/src/shard.rs

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ pub trait ShardProvider: Send + Sync + 'static {
3535
/// Resolve a shard's API by ID (for read operations).
3636
fn starknet_api(&self, shard_id: ContractAddress) -> Result<Self::Api, ErrorObjectOwned>;
3737

38-
/// Schedule a shard for execution after a write operation.
39-
fn schedule(&self, shard_id: ContractAddress);
40-
4138
/// List all registered shard IDs.
4239
fn shard_ids(&self) -> Vec<ContractAddress>;
4340

@@ -211,10 +208,7 @@ impl<P: ShardProvider> ShardApiServer for ShardRpc<P> {
211208
invoke_transaction: BroadcastedInvokeTx,
212209
) -> RpcResult<AddInvokeTransactionResponse> {
213210
let api = self.provider.starknet_api(shard_id)?;
214-
let result =
215-
StarknetWriteApiServer::add_invoke_transaction(&api, invoke_transaction).await?;
216-
self.provider.schedule(shard_id);
217-
Ok(result)
211+
StarknetWriteApiServer::add_invoke_transaction(&api, invoke_transaction).await
218212
}
219213

220214
async fn add_declare_transaction(
@@ -223,10 +217,7 @@ impl<P: ShardProvider> ShardApiServer for ShardRpc<P> {
223217
declare_transaction: BroadcastedDeclareTx,
224218
) -> RpcResult<AddDeclareTransactionResponse> {
225219
let api = self.provider.starknet_api(shard_id)?;
226-
let result =
227-
StarknetWriteApiServer::add_declare_transaction(&api, declare_transaction).await?;
228-
self.provider.schedule(shard_id);
229-
Ok(result)
220+
StarknetWriteApiServer::add_declare_transaction(&api, declare_transaction).await
230221
}
231222

232223
async fn add_deploy_account_transaction(
@@ -235,13 +226,8 @@ impl<P: ShardProvider> ShardApiServer for ShardRpc<P> {
235226
deploy_account_transaction: BroadcastedDeployAccountTx,
236227
) -> RpcResult<AddDeployAccountTransactionResponse> {
237228
let api = self.provider.starknet_api(shard_id)?;
238-
let result = StarknetWriteApiServer::add_deploy_account_transaction(
239-
&api,
240-
deploy_account_transaction,
241-
)
242-
.await?;
243-
self.provider.schedule(shard_id);
244-
Ok(result)
229+
StarknetWriteApiServer::add_deploy_account_transaction(&api, deploy_account_transaction)
230+
.await
245231
}
246232

247233
async fn trace_transaction(

crates/sharding/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ katana-db.workspace = true
1212
katana-executor.workspace = true
1313
katana-gas-price-oracle.workspace = true
1414
katana-pool.workspace = true
15+
katana-pool-api.workspace = true
1516
katana-primitives.workspace = true
1617
katana-provider.workspace = true
1718
katana-rpc-client.workspace = true
@@ -21,6 +22,7 @@ katana-rpc-api.workspace = true
2122
katana-tasks.workspace = true
2223

2324
anyhow.workspace = true
25+
futures.workspace = true
2426
parking_lot.workspace = true
2527
tokio.workspace = true
2628
tracing.workspace = true

crates/sharding/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod manager;
2+
pub mod pool;
23
pub mod runtime;
34
pub mod scheduler;
45
pub mod shard;

crates/sharding/src/manager.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::collections::HashMap;
21
use std::fmt::Debug;
32
use std::sync::Arc;
43

@@ -12,9 +11,9 @@ use katana_rpc_client::starknet::Client as StarknetClient;
1211
use katana_rpc_server::starknet::StarknetApiConfig;
1312
use katana_rpc_types::block::GetBlockWithTxHashesResponse;
1413
use katana_tasks::TaskSpawner;
15-
use parking_lot::RwLock;
1614
use url::Url;
1715

16+
use crate::scheduler::{Scheduler, ShardRegistry};
1817
use crate::shard::{Shard, ShardId};
1918

2019
type InitialBlockEnvFetcher = dyn Fn() -> Result<BlockEnv> + Send + Sync + 'static;
@@ -119,7 +118,8 @@ pub struct LazyShardManager {
119118
}
120119

121120
struct LazyShardManagerInner {
122-
shards: RwLock<HashMap<ShardId, Arc<Shard>>>,
121+
shards: ShardRegistry,
122+
scheduler: Scheduler,
123123
// Shared resources for lazy shard creation
124124
chain_spec: Arc<ChainSpec>,
125125
executor_factory: Arc<dyn ExecutorFactory>,
@@ -136,6 +136,7 @@ impl LazyShardManager {
136136
gas_oracle: GasPriceOracle,
137137
starknet_api_config: StarknetApiConfig,
138138
task_spawner: TaskSpawner,
139+
scheduler: Scheduler,
139140
base_chain_url: Url,
140141
) -> Self {
141142
let initial_block_env_fetcher = new_base_chain_block_env_fetcher(base_chain_url);
@@ -146,6 +147,7 @@ impl LazyShardManager {
146147
gas_oracle,
147148
starknet_api_config,
148149
task_spawner,
150+
scheduler,
149151
initial_block_env_fetcher,
150152
)
151153
}
@@ -156,11 +158,13 @@ impl LazyShardManager {
156158
gas_oracle: GasPriceOracle,
157159
starknet_api_config: StarknetApiConfig,
158160
task_spawner: TaskSpawner,
161+
scheduler: Scheduler,
159162
initial_block_env_fetcher: Arc<InitialBlockEnvFetcher>,
160163
) -> Self {
161164
Self {
162165
inner: Arc::new(LazyShardManagerInner {
163-
shards: RwLock::new(HashMap::new()),
166+
shards: scheduler.shard_registry(),
167+
scheduler,
164168
chain_spec,
165169
executor_factory,
166170
gas_oracle,
@@ -212,9 +216,11 @@ impl ShardManager for LazyShardManager {
212216
self.inner.starknet_api_config.clone(),
213217
self.inner.task_spawner.clone(),
214218
initial_block_env,
219+
self.inner.scheduler.clone(),
215220
)?);
216221

217222
shards.insert(id, Arc::clone(&shard));
223+
218224
Ok(shard)
219225
}
220226

crates/sharding/src/pool.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
use std::fmt;
2+
use std::future::Future;
3+
use std::sync::Arc;
4+
5+
use futures::channel::mpsc::Receiver;
6+
use katana_pool::{PendingTransactions, TransactionPool, TxPool};
7+
use katana_pool_api::PoolResult;
8+
use katana_primitives::contract::Nonce;
9+
use katana_primitives::transaction::TxHash;
10+
use katana_primitives::ContractAddress;
11+
12+
use crate::scheduler::Scheduler;
13+
use crate::shard::ShardId;
14+
15+
/// Transaction pool wrapper that triggers shard scheduling when a tx is accepted.
16+
#[derive(Clone)]
17+
pub struct ShardPool {
18+
inner: TxPool,
19+
scheduler: Scheduler,
20+
shard_id: ShardId,
21+
}
22+
23+
impl ShardPool {
24+
pub fn new(inner: TxPool, scheduler: Scheduler, shard_id: ShardId) -> Self {
25+
Self { inner, scheduler, shard_id }
26+
}
27+
}
28+
29+
impl TransactionPool for ShardPool {
30+
type Transaction = <TxPool as TransactionPool>::Transaction;
31+
type Ordering = <TxPool as TransactionPool>::Ordering;
32+
type Validator = <TxPool as TransactionPool>::Validator;
33+
34+
fn add_transaction(
35+
&self,
36+
tx: Self::Transaction,
37+
) -> impl Future<Output = PoolResult<TxHash>> + Send {
38+
let inner = self.inner.clone();
39+
let scheduler = self.scheduler.clone();
40+
let shard_id = self.shard_id;
41+
42+
async move {
43+
let result = inner.add_transaction(tx).await;
44+
45+
if result.is_ok() {
46+
scheduler.schedule(shard_id);
47+
}
48+
49+
result
50+
}
51+
}
52+
53+
fn pending_transactions(&self) -> PendingTransactions<Self::Transaction, Self::Ordering> {
54+
self.inner.pending_transactions()
55+
}
56+
57+
fn contains(&self, hash: TxHash) -> bool {
58+
self.inner.contains(hash)
59+
}
60+
61+
fn get(&self, hash: TxHash) -> Option<Arc<Self::Transaction>> {
62+
self.inner.get(hash)
63+
}
64+
65+
fn add_listener(&self) -> Receiver<TxHash> {
66+
self.inner.add_listener()
67+
}
68+
69+
fn remove_transactions(&self, hashes: &[TxHash]) {
70+
self.inner.remove_transactions(hashes);
71+
}
72+
73+
fn size(&self) -> usize {
74+
self.inner.size()
75+
}
76+
77+
fn validator(&self) -> &Self::Validator {
78+
self.inner.validator()
79+
}
80+
81+
fn get_nonce(&self, address: ContractAddress) -> Option<Nonce> {
82+
self.inner.get_nonce(address)
83+
}
84+
}
85+
86+
impl fmt::Debug for ShardPool {
87+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88+
f.debug_struct("ShardPool")
89+
.field("size", &self.inner.size())
90+
.field("shard_id", &self.shard_id)
91+
.finish()
92+
}
93+
}

crates/sharding/src/runtime.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
use std::sync::Arc;
21
use std::thread;
32
use std::time::{Duration, Instant};
43

54
use anyhow::Result;
65

76
use crate::scheduler::Scheduler;
8-
use crate::shard::Shard;
7+
use crate::shard::ShardId;
98
use crate::worker::Worker;
109

1110
/// Owns the execution resources (scheduler + worker threads) for the shard node.
@@ -41,8 +40,8 @@ pub struct RuntimeHandle {
4140

4241
impl RuntimeHandle {
4342
/// Schedule a shard for execution.
44-
pub fn schedule(&self, shard: Arc<Shard>) {
45-
self.scheduler.schedule(shard);
43+
pub fn schedule(&self, id: ShardId) {
44+
self.scheduler.schedule(id);
4645
}
4746

4847
/// Returns a reference to the underlying scheduler.

0 commit comments

Comments
 (0)