Skip to content

Commit a17ef09

Browse files
kariyclaude
andcommitted
refactor(shard): extract runtime, scheduler, worker, types, and manager into katana-sharding crate
Move the core shard runtime machinery out of katana-node into a standalone katana-sharding crate so it can be reused independently of the node layer. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 76f8a30 commit a17ef09

File tree

12 files changed

+244
-70
lines changed

12 files changed

+244
-70
lines changed

Cargo.lock

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ members = [
5050
"tests/db-compat",
5151
"tests/reverse-proxy",
5252
"crates/paymaster",
53+
"crates/sharding",
5354
# "tests/snos",
5455
]
5556

@@ -115,6 +116,7 @@ katana-tee = { path = "crates/tee" }
115116
katana-tracing = { path = "crates/tracing" }
116117
katana-trie = { path = "crates/trie" }
117118
katana-paymaster = { path = "crates/paymaster" }
119+
katana-sharding = { path = "crates/sharding" }
118120
katana-utils = { path = "crates/utils" }
119121

120122
# cairo-lang

crates/node/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ katana-rpc-server.workspace = true
2626
katana-rpc-api.workspace = true
2727
katana-rpc-client.workspace = true
2828
katana-rpc-types.workspace = true
29+
katana-sharding.workspace = true
2930
katana-stage.workspace = true
3031
katana-tasks.workspace = true
3132
katana-tee = { workspace = true, optional = true }

crates/node/src/shard/exit.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::future::Future;
22
use std::pin::Pin;
33
use std::task::{Context, Poll};
4+
use std::time::Duration;
45

56
use anyhow::Result;
67
use futures::future::BoxFuture;
@@ -18,23 +19,19 @@ impl<'a> ShardNodeStoppedFuture<'a> {
1819
pub(crate) fn new(handle: &'a LaunchedShardNode) -> Self {
1920
let rpc = handle.rpc.clone();
2021
let task_manager = handle.node.task_manager.clone();
21-
let scheduler = handle.node.scheduler.clone();
22-
let worker_handles = handle.worker_handles.lock().drain(..).collect::<Vec<_>>();
23-
22+
let runtime = handle.node.runtime.lock().take();
2423
let task_spawner = task_manager.task_spawner();
2524

2625
let fut = Box::pin(async move {
2726
task_manager.wait_for_shutdown().await;
28-
scheduler.shutdown();
29-
30-
// Join worker threads via the task manager's blocking executor.
31-
let _ = task_spawner
32-
.spawn_blocking(move || {
33-
for h in worker_handles {
34-
let _ = h.join();
35-
}
36-
})
37-
.await;
27+
28+
if let Some(runtime) = runtime {
29+
let _ = task_spawner
30+
.spawn_blocking(move || {
31+
runtime.shutdown_timeout(Duration::from_secs(30));
32+
})
33+
.await;
34+
}
3835

3936
rpc.stop()?;
4037
Ok(())

crates/node/src/shard/mod.rs

Lines changed: 29 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,8 @@
11
pub mod block_context;
22
pub mod config;
33
pub mod exit;
4-
pub mod manager;
5-
pub mod scheduler;
6-
pub mod types;
7-
pub mod worker;
84

95
use std::sync::Arc;
10-
use std::thread;
116

127
use anyhow::Result;
138
use jsonrpsee::types::ErrorObjectOwned;
@@ -26,28 +21,28 @@ use katana_rpc_client::starknet::Client as StarknetClient;
2621
use katana_rpc_server::shard::{ShardProvider, ShardRpc};
2722
use katana_rpc_server::starknet::{StarknetApi, StarknetApiConfig};
2823
use katana_rpc_server::{RpcServer, RpcServerHandle};
24+
use katana_sharding::manager::{LazyShardManager, ShardManager};
25+
use katana_sharding::runtime::{RuntimeHandle, ShardRuntime};
26+
use katana_sharding::types::NoPendingBlockProvider;
2927
use katana_tasks::TaskManager;
3028
use parking_lot::{Mutex, RwLock};
3129
use tracing::info;
3230

3331
use self::block_context::BlockContextListener;
3432
use self::config::ShardNodeConfig;
3533
use self::exit::ShardNodeStoppedFuture;
36-
use self::manager::{LazyShardManager, ShardManager};
37-
use self::scheduler::ShardScheduler;
38-
use self::types::NoPendingBlockProvider;
3934

4035
/// A shard node instance.
4136
#[must_use = "ShardNode does nothing unless launched."]
4237
#[derive(Debug)]
4338
pub struct Node {
4439
pub config: Arc<ShardNodeConfig>,
4540
pub manager: Arc<dyn ShardManager>,
46-
pub scheduler: ShardScheduler,
47-
pub block_env: Arc<RwLock<BlockEnv>>,
41+
pub handle: RuntimeHandle,
4842
pub task_manager: TaskManager,
4943
pub rpc_server: RpcServer,
5044
pub block_context_listener: BlockContextListener,
45+
pub runtime: Mutex<Option<ShardRuntime>>,
5146
}
5247

5348
impl Node {
@@ -114,11 +109,16 @@ impl Node {
114109
starknet_version: katana_primitives::version::CURRENT_STARKNET_VERSION,
115110
}));
116111

112+
// --- Build runtime (scheduler + workers)
113+
let runtime =
114+
ShardRuntime::new(config.worker_count, config.time_quantum, block_env.clone());
115+
let handle = runtime.handle();
116+
117117
// --- Build base chain client and block context listener
118118
let starknet_client = StarknetClient::new(config.base_chain_url.clone());
119119
let block_context_listener = BlockContextListener::new(
120120
starknet_client,
121-
block_env.clone(),
121+
block_env,
122122
config.chain.clone(),
123123
config.block_poll_interval,
124124
);
@@ -132,13 +132,10 @@ impl Node {
132132
task_spawner.clone(),
133133
));
134134

135-
// --- Build scheduler
136-
let scheduler = ShardScheduler::new(config.time_quantum);
137-
138135
// --- Build RPC server with shard API
139136
let provider = NodeShardProvider {
140137
manager: manager.clone(),
141-
scheduler: scheduler.clone(),
138+
handle: handle.clone(),
142139
chain_spec: config.chain.clone(),
143140
};
144141

@@ -152,11 +149,11 @@ impl Node {
152149
Ok(Node {
153150
config,
154151
manager,
155-
scheduler,
156-
block_env,
152+
handle,
157153
task_manager,
158154
rpc_server,
159155
block_context_listener,
156+
runtime: Mutex::new(Some(runtime)),
160157
})
161158
}
162159

@@ -169,11 +166,7 @@ impl Node {
169166
.name("Block context listener")
170167
.spawn(listener.run());
171168

172-
let worker_handles = worker::spawn_workers(
173-
self.config.worker_count,
174-
self.scheduler.clone(),
175-
self.block_env.clone(),
176-
);
169+
self.runtime.lock().as_mut().expect("runtime already taken").start();
177170

178171
let rpc = self.rpc_server.start(self.config.rpc.socket_addr()).await?;
179172

@@ -183,41 +176,31 @@ impl Node {
183176
"Shard node launched."
184177
);
185178

186-
Ok(LaunchedShardNode {
187-
node: self,
188-
rpc,
189-
worker_handles: parking_lot::Mutex::new(worker_handles),
190-
})
179+
Ok(LaunchedShardNode { node: self, rpc })
191180
}
192181
}
193182

194183
/// Handle to a running shard node.
195184
pub struct LaunchedShardNode {
196185
pub node: Node,
197186
pub rpc: RpcServerHandle,
198-
/// Worker thread handles, taken during shutdown to join them.
199-
worker_handles: Mutex<Vec<thread::JoinHandle<()>>>,
200187
}
201188

202189
impl LaunchedShardNode {
203190
pub async fn stop(&self) -> Result<()> {
204-
// Signal the scheduler to stop — workers will exit their loops.
205-
self.node.scheduler.shutdown();
206191
self.rpc.stop()?;
207192

208-
// Join worker threads via the task manager's blocking executor.
209-
let handles: Vec<_> = self.worker_handles.lock().drain(..).collect();
210-
let _ = self
211-
.node
212-
.task_manager
213-
.task_spawner()
214-
.cpu_bound()
215-
.spawn(move || {
216-
for h in handles {
217-
let _ = h.join();
218-
}
219-
})
220-
.await;
193+
let runtime = self.node.runtime.lock().take();
194+
if let Some(runtime) = runtime {
195+
let _ = self
196+
.node
197+
.task_manager
198+
.task_spawner()
199+
.spawn_blocking(move || {
200+
runtime.shutdown_timeout(std::time::Duration::from_secs(30));
201+
})
202+
.await;
203+
}
221204

222205
self.node.task_manager.shutdown().await;
223206
Ok(())
@@ -236,7 +219,7 @@ impl LaunchedShardNode {
236219
/// manager and scheduler to the generic `ShardRpc` RPC handler.
237220
struct NodeShardProvider {
238221
manager: Arc<dyn ShardManager>,
239-
scheduler: ShardScheduler,
222+
handle: RuntimeHandle,
240223
chain_spec: Arc<ChainSpec>,
241224
}
242225

@@ -270,7 +253,7 @@ impl ShardProvider for NodeShardProvider {
270253

271254
fn schedule(&self, shard_id: ContractAddress) {
272255
if let Ok(shard) = self.manager.get(shard_id) {
273-
self.scheduler.schedule(shard);
256+
self.handle.schedule(shard);
274257
}
275258
}
276259

crates/sharding/Cargo.toml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
[package]
2+
name = "katana-sharding"
3+
edition.workspace = true
4+
license.workspace = true
5+
repository.workspace = true
6+
version.workspace = true
7+
8+
[dependencies]
9+
katana-chain-spec.workspace = true
10+
katana-core.workspace = true
11+
katana-db.workspace = true
12+
katana-executor.workspace = true
13+
katana-gas-price-oracle.workspace = true
14+
katana-pool.workspace = true
15+
katana-primitives.workspace = true
16+
katana-provider.workspace = true
17+
katana-rpc-server.workspace = true
18+
katana-rpc-types.workspace = true
19+
katana-rpc-api.workspace = true
20+
katana-tasks.workspace = true
21+
22+
anyhow.workspace = true
23+
parking_lot.workspace = true
24+
tracing.workspace = true

crates/sharding/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pub mod manager;
2+
pub mod runtime;
3+
pub mod scheduler;
4+
pub mod types;
5+
pub mod worker;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use katana_rpc_server::starknet::StarknetApiConfig;
1010
use katana_tasks::TaskSpawner;
1111
use parking_lot::RwLock;
1212

13-
use super::types::{Shard, ShardId};
13+
use crate::types::{Shard, ShardId};
1414

1515
/// Pluggable abstraction for shard storage and creation policy.
1616
///

0 commit comments

Comments
 (0)