@@ -2,9 +2,11 @@ use std::sync::Arc;
22use std:: thread;
33use std:: time:: { Duration , Instant } ;
44
5- use crate :: scheduler:: ShardScheduler ;
5+ use anyhow:: Result ;
6+
7+ use crate :: scheduler:: Scheduler ;
68use crate :: types:: Shard ;
7- use crate :: worker;
9+ use crate :: worker:: Worker ;
810
911/// Owns the execution resources (scheduler + worker threads) for the shard node.
1012///
@@ -19,18 +21,18 @@ use crate::worker;
1921/// so workers detach immediately.
2022/// - **Drop** — if the runtime has not been consumed by one of the above methods, the `Drop` impl
2123/// signals the scheduler and blocks until all workers exit.
22- pub struct ShardRuntime {
24+ pub struct Runtime {
2325 handle : RuntimeHandle ,
2426 worker_count : usize ,
25- worker_handles : Vec < thread:: JoinHandle < ( ) > > ,
27+ worker_handles : Vec < thread:: JoinHandle < Result < ( ) > > > ,
2628}
2729
2830/// Cheap, cloneable reference for scheduling work on the [`ShardRuntime`].
2931///
3032/// Analogous to `tokio::runtime::Handle`.
3133#[ derive( Clone , Debug ) ]
3234pub struct RuntimeHandle {
33- scheduler : ShardScheduler ,
35+ scheduler : Scheduler ,
3436}
3537
3638// ---------------------------------------------------------------------------
@@ -44,7 +46,7 @@ impl RuntimeHandle {
4446 }
4547
4648 /// Returns a reference to the underlying scheduler.
47- pub fn scheduler ( & self ) -> & ShardScheduler {
49+ pub fn scheduler ( & self ) -> & Scheduler {
4850 & self . scheduler
4951 }
5052}
@@ -53,10 +55,10 @@ impl RuntimeHandle {
5355// ShardRuntime
5456// ---------------------------------------------------------------------------
5557
56- impl ShardRuntime {
58+ impl Runtime {
5759 /// Creates a new runtime. Workers are **not** spawned until [`start`](Self::start) is called.
5860 pub fn new ( worker_count : usize , time_quantum : Duration ) -> Self {
59- let scheduler = ShardScheduler :: new ( time_quantum) ;
61+ let scheduler = Scheduler :: new ( time_quantum) ;
6062 let handle = RuntimeHandle { scheduler } ;
6163 Self { handle, worker_count, worker_handles : Vec :: new ( ) }
6264 }
@@ -69,8 +71,21 @@ impl ShardRuntime {
6971 /// Spawns the worker threads. Must be called exactly once.
7072 pub fn start ( & mut self ) {
7173 assert ! ( self . worker_handles. is_empty( ) , "ShardRuntime::start called more than once" ) ;
72- self . worker_handles =
73- worker:: spawn_workers ( self . worker_count , self . handle . scheduler . clone ( ) ) ;
74+
75+ let total_workers = self . worker_count ;
76+ let scheduler_handle = self . handle . scheduler . clone ( ) ;
77+
78+ self . worker_handles = ( 0 ..total_workers)
79+ . map ( |worker_id| {
80+ let worker = Worker :: new ( worker_id, scheduler_handle. clone ( ) ) ;
81+ let worker_thread_name = format ! ( "shard-worker-{worker_id}" ) ;
82+
83+ std:: thread:: Builder :: new ( )
84+ . name ( worker_thread_name)
85+ . spawn ( move || worker. run ( ) )
86+ . expect ( "failed to spawn shard worker thread" )
87+ } )
88+ . collect :: < Vec < thread:: JoinHandle < Result < ( ) > > > > ( ) ;
7489 }
7590
7691 /// Signals the scheduler to shut down, then joins worker threads up to `duration`.
@@ -109,7 +124,7 @@ impl ShardRuntime {
109124 }
110125}
111126
112- impl Drop for ShardRuntime {
127+ impl Drop for Runtime {
113128 fn drop ( & mut self ) {
114129 if !self . worker_handles . is_empty ( ) {
115130 self . handle . scheduler . shutdown ( ) ;
@@ -120,7 +135,7 @@ impl Drop for ShardRuntime {
120135 }
121136}
122137
123- impl std:: fmt:: Debug for ShardRuntime {
138+ impl std:: fmt:: Debug for Runtime {
124139 fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
125140 f. debug_struct ( "ShardRuntime" )
126141 . field ( "worker_count" , & self . worker_count )
0 commit comments