Skip to content

Commit 7e1ffb1

Browse files
committed
wip
1 parent ab50700 commit 7e1ffb1

File tree

11 files changed

+94
-161
lines changed

11 files changed

+94
-161
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/cli/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ katana-tracing.workspace = true
2020
katana-paymaster = { workspace = true, optional = true }
2121
katana-utils.workspace = true
2222
serde-utils.workspace = true
23-
cartridge = { path = "../cartridge", optional = true }
23+
cartridge = { workspace = true, optional = true }
2424

2525
alloy-primitives.workspace = true
2626
anyhow.workspace = true

crates/node/sequencer/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ katana-rpc-server.workspace = true
2323
katana-rpc-api.workspace = true
2424
katana-rpc-client.workspace = true
2525
katana-rpc-types.workspace = true
26-
katana-sharding.workspace = true
2726
katana-stage.workspace = true
2827
katana-tasks.workspace = true
2928
katana-tee = { workspace = true, optional = true }

crates/node/sequencer/src/config/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ pub mod tee;
1616
pub mod grpc;
1717

1818
use db::DbConfig;
19-
use dev::DevConfig;
2019
use fork::ForkingConfig;
2120
use gateway::GatewayConfig;
2221
#[cfg(feature = "grpc")]

crates/node/sharding/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,6 @@ jsonrpsee.workspace = true
3939
serde.workspace = true
4040
tracing.workspace = true
4141
url.workspace = true
42+
43+
[features]
44+
native = ["katana-executor/native", "katana-node-config/native"]

crates/node/sharding/src/lib.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use katana_rpc_server::shard::{ShardProvider, ShardRpc};
2020
use katana_rpc_server::starknet::{StarknetApi, StarknetApiConfig};
2121
use katana_rpc_server::{RpcServer, RpcServerHandle};
2222
use katana_sharding::manager::{LazyShardManager, ShardManager};
23-
use katana_sharding::runtime::{RuntimeHandle, ShardRuntime};
23+
use katana_sharding::runtime::{Runtime, RuntimeHandle};
2424
use katana_sharding::types::NoPendingBlockProvider;
2525
use katana_tasks::TaskManager;
2626
use parking_lot::Mutex;
@@ -38,7 +38,7 @@ pub struct Node {
3838
pub handle: RuntimeHandle,
3939
pub task_manager: TaskManager,
4040
pub rpc_server: RpcServer,
41-
pub runtime: Mutex<Option<ShardRuntime>>,
41+
pub runtime: Mutex<Option<Runtime>>,
4242
}
4343

4444
impl Node {
@@ -89,12 +89,10 @@ impl Node {
8989
max_concurrent_estimate_fee_requests: config.rpc.max_concurrent_estimate_fee_requests,
9090
simulation_flags: executor_factory.execution_flags().clone(),
9191
versioned_constant_overrides,
92-
#[cfg(feature = "cartridge")]
93-
paymaster: None,
9492
};
9593

9694
// --- Build runtime (scheduler + workers)
97-
let runtime = ShardRuntime::new(config.worker_count, config.time_quantum);
95+
let runtime = Runtime::new(config.worker_count, config.time_quantum);
9896
let handle = runtime.handle();
9997

10098
// --- Build shard manager

crates/sharding/src/manager.rs

Lines changed: 0 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -228,74 +228,3 @@ impl Debug for LazyShardManager {
228228
f.debug_struct("LazyShardManager").field("shard_count", &self.len()).finish_non_exhaustive()
229229
}
230230
}
231-
232-
#[cfg(test)]
233-
mod tests {
234-
use std::sync::Arc;
235-
236-
use katana_chain_spec::ChainSpec;
237-
use katana_executor::blockifier::cache::ClassCache;
238-
use katana_executor::blockifier::BlockifierFactory;
239-
use katana_executor::{BlockLimits, ExecutionFlags, ExecutorFactory};
240-
use katana_gas_price_oracle::GasPriceOracle;
241-
use katana_primitives::block::FinalityStatus;
242-
use katana_primitives::da::L1DataAvailabilityMode;
243-
use katana_primitives::env::BlockEnv;
244-
use katana_primitives::{address, Felt};
245-
use katana_rpc_server::starknet::StarknetApiConfig;
246-
use katana_rpc_types::block::{
247-
BlockWithTxHashes, GetBlockWithTxHashesResponse, PreConfirmedBlockWithTxHashes,
248-
};
249-
use katana_tasks::TaskManager;
250-
251-
use super::{block_env_from_latest_block_response, LazyShardManager, ShardManager};
252-
253-
#[test]
254-
fn rejects_invalid_starknet_version_from_latest_block() {
255-
let response = GetBlockWithTxHashesResponse::PreConfirmed(PreConfirmedBlockWithTxHashes {
256-
block_number: 7,
257-
timestamp: 1_234_567_890,
258-
sequencer_address: address!("0x1234"),
259-
l1_gas_price: resource_price(101, 102),
260-
l2_gas_price: resource_price(201, 202),
261-
l1_data_gas_price: resource_price(301, 302),
262-
l1_da_mode: L1DataAvailabilityMode::Blob,
263-
starknet_version: "not.a.version".to_owned(),
264-
transactions: vec![],
265-
});
266-
267-
let err = block_env_from_latest_block_response(response).expect_err("must fail");
268-
assert!(err.to_string().contains("invalid Starknet version"));
269-
}
270-
271-
#[tokio::test]
272-
async fn get_fails_and_does_not_register_shard_when_initial_env_fetch_fails() {
273-
let chain_spec = Arc::new(ChainSpec::dev());
274-
let manager = LazyShardManager::new_with_block_env_fetcher(
275-
chain_spec.clone(),
276-
test_executor_factory(chain_spec),
277-
GasPriceOracle::create_for_testing(),
278-
test_starknet_api_config(),
279-
TaskManager::current().task_spawner(),
280-
Arc::new(|| -> anyhow::Result<BlockEnv> { anyhow::bail!("base chain unavailable") }),
281-
);
282-
283-
let shard_id = address!("0x42");
284-
let err = manager.get(shard_id).expect_err("must fail");
285-
286-
assert!(err.to_string().contains("failed to generate initial block context"));
287-
assert!(manager.is_empty());
288-
assert!(manager.shard_ids().is_empty());
289-
}
290-
291-
fn test_executor_factory(chain_spec: Arc<ChainSpec>) -> Arc<dyn ExecutorFactory> {
292-
let class_cache = ClassCache::new().expect("class cache should initialize");
293-
Arc::new(BlockifierFactory::new(
294-
None,
295-
ExecutionFlags::new(),
296-
BlockLimits::default(),
297-
class_cache,
298-
chain_spec,
299-
))
300-
}
301-
}

crates/sharding/src/runtime.rs

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ use std::sync::Arc;
22
use std::thread;
33
use std::time::{Duration, Instant};
44

5-
use crate::scheduler::ShardScheduler;
5+
use anyhow::Result;
6+
7+
use crate::scheduler::Scheduler;
68
use crate::types::Shard;
7-
use crate::worker;
9+
use crate::worker::Worker;
810

911
/// Owns the execution resources (scheduler + worker threads) for the shard node.
1012
///
@@ -19,18 +21,18 @@ use crate::worker;
1921
/// so workers detach immediately.
2022
/// - **Drop** — if the runtime has not been consumed by one of the above methods, the `Drop` impl
2123
/// signals the scheduler and blocks until all workers exit.
22-
pub struct ShardRuntime {
24+
pub struct Runtime {
2325
handle: RuntimeHandle,
2426
worker_count: usize,
25-
worker_handles: Vec<thread::JoinHandle<()>>,
27+
worker_handles: Vec<thread::JoinHandle<Result<()>>>,
2628
}
2729

2830
/// Cheap, cloneable reference for scheduling work on the [`ShardRuntime`].
2931
///
3032
/// Analogous to `tokio::runtime::Handle`.
3133
#[derive(Clone, Debug)]
3234
pub struct RuntimeHandle {
33-
scheduler: ShardScheduler,
35+
scheduler: Scheduler,
3436
}
3537

3638
// ---------------------------------------------------------------------------
@@ -44,7 +46,7 @@ impl RuntimeHandle {
4446
}
4547

4648
/// Returns a reference to the underlying scheduler.
47-
pub fn scheduler(&self) -> &ShardScheduler {
49+
pub fn scheduler(&self) -> &Scheduler {
4850
&self.scheduler
4951
}
5052
}
@@ -53,10 +55,10 @@ impl RuntimeHandle {
5355
// ShardRuntime
5456
// ---------------------------------------------------------------------------
5557

56-
impl ShardRuntime {
58+
impl Runtime {
5759
/// Creates a new runtime. Workers are **not** spawned until [`start`](Self::start) is called.
5860
pub fn new(worker_count: usize, time_quantum: Duration) -> Self {
59-
let scheduler = ShardScheduler::new(time_quantum);
61+
let scheduler = Scheduler::new(time_quantum);
6062
let handle = RuntimeHandle { scheduler };
6163
Self { handle, worker_count, worker_handles: Vec::new() }
6264
}
@@ -69,8 +71,21 @@ impl ShardRuntime {
6971
/// Spawns the worker threads. Must be called exactly once.
7072
pub fn start(&mut self) {
7173
assert!(self.worker_handles.is_empty(), "ShardRuntime::start called more than once");
72-
self.worker_handles =
73-
worker::spawn_workers(self.worker_count, self.handle.scheduler.clone());
74+
75+
let total_workers = self.worker_count;
76+
let scheduler_handle = self.handle.scheduler.clone();
77+
78+
self.worker_handles = (0..total_workers)
79+
.map(|worker_id| {
80+
let worker = Worker::new(worker_id, scheduler_handle.clone());
81+
let worker_thread_name = format!("shard-worker-{worker_id}");
82+
83+
std::thread::Builder::new()
84+
.name(worker_thread_name)
85+
.spawn(move || worker.run())
86+
.expect("failed to spawn shard worker thread")
87+
})
88+
.collect::<Vec<thread::JoinHandle<Result<()>>>>();
7489
}
7590

7691
/// Signals the scheduler to shut down, then joins worker threads up to `duration`.
@@ -109,7 +124,7 @@ impl ShardRuntime {
109124
}
110125
}
111126

112-
impl Drop for ShardRuntime {
127+
impl Drop for Runtime {
113128
fn drop(&mut self) {
114129
if !self.worker_handles.is_empty() {
115130
self.handle.scheduler.shutdown();
@@ -120,7 +135,7 @@ impl Drop for ShardRuntime {
120135
}
121136
}
122137

123-
impl std::fmt::Debug for ShardRuntime {
138+
impl std::fmt::Debug for Runtime {
124139
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125140
f.debug_struct("ShardRuntime")
126141
.field("worker_count", &self.worker_count)

crates/sharding/src/scheduler.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::types::{Shard, ShardState};
1212
/// Workers block on [`next_task`](ShardScheduler::next_task) using a condvar,
1313
/// so they must run on dedicated OS threads (not async tasks).
1414
#[derive(Debug, Clone)]
15-
pub struct ShardScheduler {
15+
pub struct Scheduler {
1616
inner: Arc<SchedulerInner>,
1717
}
1818

@@ -23,7 +23,7 @@ struct SchedulerInner {
2323
shutdown: AtomicBool,
2424
}
2525

26-
impl ShardScheduler {
26+
impl Scheduler {
2727
pub fn new(time_quantum: Duration) -> Self {
2828
Self {
2929
inner: Arc::new(SchedulerInner {
@@ -50,13 +50,13 @@ impl ShardScheduler {
5050
pub fn next_task(&self) -> Option<Arc<Shard>> {
5151
let mut queue = self.inner.queue.lock();
5252

53-
if let Some(shard) = queue.pop_front() {
54-
return Some(shard);
53+
if let shard @ Some(..) = queue.pop_front() {
54+
shard
55+
} else {
56+
// Block until notified (new work or shutdown wake) or timeout.
57+
self.inner.condvar.wait_for(&mut queue, Duration::from_millis(100));
58+
queue.pop_front()
5559
}
56-
57-
// Block until notified (new work or shutdown wake) or timeout.
58-
self.inner.condvar.wait_for(&mut queue, Duration::from_millis(100));
59-
queue.pop_front()
6060
}
6161

6262
/// Returns the time quantum for worker preemption.

crates/sharding/src/types.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ use katana_executor::ExecutorFactory;
99
use katana_gas_price_oracle::GasPriceOracle;
1010
use katana_pool::ordering::FiFo;
1111
use katana_pool::validation::stateful::TxValidator;
12-
use katana_pool::TxPool;
12+
use katana_pool::{TransactionPool, TxPool};
1313
use katana_primitives::env::BlockEnv;
14-
use katana_primitives::transaction::TxHash;
14+
use katana_primitives::transaction::{ExecutableTxWithHash, TxHash};
1515
use katana_primitives::ContractAddress;
1616
use katana_provider::api::env::BlockEnvProvider;
1717
use katana_provider::api::state::{StateFactoryProvider, StateProvider};
@@ -211,4 +211,38 @@ impl Shard {
211211
.compare_exchange(expected as u8, new as u8, Ordering::SeqCst, Ordering::SeqCst)
212212
.is_ok()
213213
}
214+
215+
pub fn execute(&self) -> Result<()> {
216+
// // Collect pending transactions from the shard's pool
217+
let txs = self
218+
.pool
219+
.pending_transactions()
220+
.all
221+
.map(|ptx| (*ptx.tx).clone())
222+
.collect::<Vec<ExecutableTxWithHash>>();
223+
224+
if txs.is_empty() {
225+
return Ok(());
226+
}
227+
228+
let tx_hashes: Vec<_> = txs.iter().map(|tx| tx.hash).collect();
229+
let _tx_count = txs.len();
230+
231+
// // Read block env from the shard's own context
232+
let block_env = self.block_env.read().clone();
233+
234+
let state = self.provider.provider().latest()?;
235+
236+
let mut executor = self.backend.executor_factory.executor(state, block_env.clone());
237+
238+
executor.execute_transactions(txs)?;
239+
let output = executor.take_execution_output()?;
240+
241+
self.backend.do_mine_block(&block_env, output)?;
242+
243+
// Remove executed txs from pool
244+
self.pool.remove_transactions(&tx_hashes);
245+
246+
Ok(())
247+
}
214248
}

0 commit comments

Comments
 (0)