Skip to content

Commit 485d966

Browse files
committed
Fix handler lifecycle race condition for shutdown receiver
The message-server handler's register() function needed a ShutdownReceiver, but it was only stored during the async setup() method. If an actor called register() during its synchronous init function, setup() hadn't run yet. Additionally, setup() had a tokio::select! that immediately took the receiver from the mutex, racing with register(). Changes: - Add ShutdownController to HandlerContext for early subscription - Make ShutdownController Clone-able with Arc<Mutex<>> - Create ShutdownController early in build_actor_resources() - Message-server handler now subscribes during setup_host_functions_composite() - Simplified setup() to just wait for registered_notify signal
1 parent 381508d commit 485d966

File tree

4 files changed

+128
-39
lines changed

4 files changed

+128
-39
lines changed

crates/theater-handler-message-server/src/lib.rs

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -569,40 +569,30 @@ impl Handler for MessageServerHandler
569569
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
570570
info!("Message server handler setup (passive mode)");
571571

572-
// Store the actor_handle and shutdown_receiver for use by register()
572+
// Store the actor_handle for use by register()
573573
{
574574
let mut handle_guard = self.actor_handle.lock().unwrap();
575575
*handle_guard = Some(actor_handle);
576576
}
577+
// Only store shutdown_receiver if not already set during setup_host_functions_composite()
578+
// The early receiver from setup_host_functions_composite() is what register() needs
577579
{
578580
let mut shutdown_guard = self.shutdown_receiver.lock().unwrap();
579-
*shutdown_guard = Some(shutdown_receiver);
581+
if shutdown_guard.is_none() {
582+
*shutdown_guard = Some(shutdown_receiver);
583+
}
584+
// If already set, drop the one passed here - the controller will notify both
580585
}
581586

582-
let shutdown_receiver_arc = self.shutdown_receiver.clone();
583587
let registered_notify = self.registered_notify.clone();
584588

585-
// Wait for either:
586-
// 1. register() takes the receiver (notified via registered_notify)
587-
// 2. Shutdown signal received (if register() was never called)
589+
// Wait for register() to be called (or shutdown if never called).
590+
// The shutdown receiver is left in the mutex for register() to take.
591+
// If the actor never calls register(), the receiver stays in the mutex
592+
// and will be dropped when the handler is dropped.
588593
Box::pin(async move {
589-
tokio::select! {
590-
// Wait for register() to take the receiver
591-
_ = registered_notify.notified() => {
592-
info!("Message server handler: registered, setup complete");
593-
}
594-
// Wait for shutdown if receiver wasn't taken
595-
_ = async {
596-
let receiver = {
597-
let mut guard = shutdown_receiver_arc.lock().unwrap();
598-
guard.take()
599-
};
600-
if let Some(rx) = receiver {
601-
rx.wait_for_shutdown().await;
602-
info!("Message server handler received shutdown signal");
603-
}
604-
} => {}
605-
}
594+
registered_notify.notified().await;
595+
info!("Message server handler: registered, setup complete");
606596
Ok(())
607597
})
608598
}
@@ -623,6 +613,16 @@ impl Handler for MessageServerHandler
623613
self.actor_id = Some(actor_id.clone());
624614
}
625615

616+
// Get a shutdown receiver early - this is needed by register() which may be
617+
// called before the async setup() runs. This fixes the race condition where
618+
// an actor calls register() immediately in init before setup() has stored
619+
// the shutdown_receiver.
620+
if let Some(shutdown_receiver) = ctx.subscribe_shutdown() {
621+
let mut guard = self.shutdown_receiver.lock().unwrap();
622+
*guard = Some(shutdown_receiver);
623+
info!("Message server handler: got shutdown receiver during host function setup");
624+
}
625+
626626
// Check if already satisfied
627627
if ctx.is_satisfied("theater:simple/message-server-host") {
628628
info!("theater:simple/message-server-host already satisfied, skipping");

crates/theater/src/actor/runtime.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,11 @@ impl ActorRuntime {
290290
Some(Arc::new(RecordingInterceptor::new(chain.clone())))
291291
};
292292

293-
// Create a closure that sets up all handler host functions
294-
let mut handler_ctx = HandlerContext::new();
293+
// Create shutdown controller early so handlers can subscribe during setup
294+
let mut shutdown_controller = ShutdownController::new();
295+
296+
// Create handler context with shutdown controller access
297+
let mut handler_ctx = HandlerContext::with_shutdown_controller(shutdown_controller.clone());
295298
handler_ctx.actor_id = Some(id.clone());
296299
let handlers_for_setup = &mut handlers;
297300

@@ -480,9 +483,8 @@ impl ActorRuntime {
480483
*instance_guard = Some(actor_instance);
481484
}
482485

483-
// Set up handlers
486+
// Set up handlers - use the shutdown_controller created earlier
484487
let mut handler_tasks: Vec<JoinHandle<()>> = vec![];
485-
let mut shutdown_controller = ShutdownController::new();
486488
let handler_actor_handle = actor_handle.clone();
487489
debug!("Setting up {} handlers", handlers.len());
488490
for mut handler in handlers {

crates/theater/src/handler/mod.rs

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::chain::ChainEvent;
44
use crate::id::TheaterId;
55
use crate::pack_bridge::{HostLinkerBuilder, LinkerError, PackInstance, TypeHash};
66
use crate::config::actor_manifest::HandlerConfig;
7-
use crate::shutdown::ShutdownReceiver;
7+
use crate::shutdown::{ShutdownController, ShutdownReceiver};
88
use anyhow::Result;
99
use std::collections::HashSet;
1010
use std::future::Future;
@@ -17,22 +17,46 @@ use tokio::sync::RwLock;
1717
pub type SharedActorInstance = Arc<RwLock<Option<PackInstance>>>;
1818

1919
/// Context passed to handlers during setup, tracking which imports are already satisfied
20-
#[derive(Debug, Clone, Default)]
20+
/// and providing access to the shutdown controller for handlers that need it.
21+
#[derive(Debug, Clone)]
2122
pub struct HandlerContext {
2223
/// Set of imports that have already been registered by other handlers
2324
pub satisfied_imports: HashSet<String>,
2425
/// The actor ID for the actor being set up
2526
pub actor_id: Option<TheaterId>,
27+
/// Shutdown controller - handlers can subscribe to get shutdown signals
28+
pub shutdown_controller: Option<ShutdownController>,
29+
}
30+
31+
impl Default for HandlerContext {
32+
fn default() -> Self {
33+
Self::new()
34+
}
2635
}
2736

2837
impl HandlerContext {
2938
pub fn new() -> Self {
3039
Self {
3140
satisfied_imports: HashSet::new(),
3241
actor_id: None,
42+
shutdown_controller: None,
3343
}
3444
}
3545

46+
/// Create a new context with a shutdown controller
47+
pub fn with_shutdown_controller(shutdown_controller: ShutdownController) -> Self {
48+
Self {
49+
satisfied_imports: HashSet::new(),
50+
actor_id: None,
51+
shutdown_controller: Some(shutdown_controller),
52+
}
53+
}
54+
55+
/// Get a shutdown receiver from the controller, if available
56+
pub fn subscribe_shutdown(&mut self) -> Option<ShutdownReceiver> {
57+
self.shutdown_controller.as_mut().map(|c| c.subscribe())
58+
}
59+
3660
/// Check if an import is already satisfied
3761
pub fn is_satisfied(&self, import: &str) -> bool {
3862
self.satisfied_imports.contains(import)
@@ -155,6 +179,14 @@ impl HandlerRegistry {
155179
/// External handler crates can implement this trait and register their handlers
156180
/// with the Theater runtime without depending on the concrete `Handler` enum.
157181
///
182+
/// ## Handler Lifecycle
183+
///
184+
/// 1. `create_instance()` - Clone/create handler instance with optional config
185+
/// 2. `setup_host_functions_composite()` - Register host functions (sync, during instantiation)
186+
/// - HandlerContext provides shutdown_controller for handlers that need early access
187+
/// 3. `init()` - Synchronous critical initialization (called before actor can receive calls)
188+
/// 4. `run()` - Async runtime loop (spawned as background task)
189+
///
158190
/// ## Composite Migration
159191
///
160192
/// For handlers migrating to Composite's Graph ABI runtime, implement:
@@ -169,21 +201,61 @@ pub trait Handler: Send + Sync + 'static {
169201
/// with that config. Otherwise, clones the current instance.
170202
fn create_instance(&self, config: Option<&HandlerConfig>) -> Box<dyn Handler>;
171203

172-
/// Initialize the handler with actor references.
204+
/// Synchronous initialization called BEFORE the actor can receive any calls.
173205
///
174-
/// This is called once when the actor starts. Handlers should store the provided
175-
/// handles for later use but NOT start event loops here. Event loops should be
176-
/// triggered by explicit actor calls (e.g., `enable-input()` for terminal).
206+
/// This is the place for critical setup that must complete before host functions
207+
/// can be used. For example, storing actor handles that host functions depend on.
208+
///
209+
/// Note: If a handler needs a ShutdownReceiver for host functions, it should
210+
/// subscribe via `ctx.subscribe_shutdown()` during `setup_host_functions_composite()`.
211+
///
212+
/// This runs synchronously - do NOT do any async work here.
213+
/// Default implementation does nothing.
214+
fn init(
215+
&mut self,
216+
_actor_handle: ActorHandle,
217+
_actor_instance: SharedActorInstance,
218+
) {
219+
// Default: no-op
220+
}
221+
222+
/// Async runtime loop that runs for the handler's lifetime.
223+
///
224+
/// This is spawned as a background task AFTER init() completes and the actor
225+
/// is ready to receive calls. Use this for event loops, message consumption,
226+
/// or any long-running async operations.
177227
///
178228
/// The `event_rx` parameter receives chain events as they're recorded. Most handlers
179229
/// can ignore this, but ReplayHandler uses it for streaming hash verification.
230+
///
231+
/// Default implementation just waits for shutdown.
232+
fn run(
233+
&mut self,
234+
shutdown_receiver: ShutdownReceiver,
235+
_event_rx: broadcast::Receiver<ChainEvent>,
236+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
237+
Box::pin(async move {
238+
shutdown_receiver.wait_for_shutdown().await;
239+
Ok(())
240+
})
241+
}
242+
243+
/// Initialize and run the handler.
244+
///
245+
/// The runtime calls init() synchronously, then spawns run() as a background task.
246+
/// Most handlers should override init() and/or run() rather than this method.
180247
fn setup(
181248
&mut self,
182249
actor_handle: ActorHandle,
183250
actor_instance: SharedActorInstance,
184251
shutdown_receiver: ShutdownReceiver,
185252
event_rx: broadcast::Receiver<ChainEvent>,
186-
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
253+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
254+
// Call init synchronously first
255+
self.init(actor_handle, actor_instance);
256+
// Then return the run future
257+
self.run(shutdown_receiver, event_rx)
258+
}
187259

188260
/// Set up host functions for this handler (Composite Graph ABI runtime).
189261
///

crates/theater/src/shutdown.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::sync::{Arc, Mutex};
12
use std::time::Duration;
23
use tokio::sync::oneshot::{Receiver, Sender};
34
use tracing::debug;
@@ -34,31 +35,45 @@ pub enum ShutdownType {
3435
Force,
3536
}
3637

37-
/// Controller that can broadcast shutdown signals to multiple receivers
38+
/// Controller that can broadcast shutdown signals to multiple receivers.
39+
/// This type is Clone-able and all clones share the same subscriber list.
40+
#[derive(Clone)]
3841
pub struct ShutdownController {
39-
subscribers: Vec<Sender<ShutdownSignal>>,
42+
subscribers: Arc<Mutex<Vec<Sender<ShutdownSignal>>>>,
43+
}
44+
45+
impl std::fmt::Debug for ShutdownController {
46+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47+
f.debug_struct("ShutdownController")
48+
.field("subscriber_count", &self.subscribers.lock().unwrap().len())
49+
.finish()
50+
}
4051
}
4152

4253
impl ShutdownController {
4354
/// Create a new ShutdownController and a ShutdownReceiver
4455
pub fn new() -> Self {
4556
Self {
46-
subscribers: Vec::new(),
57+
subscribers: Arc::new(Mutex::new(Vec::new())),
4758
}
4859
}
4960

5061
/// Get a new receiver for this controller
5162
pub fn subscribe(&mut self) -> ShutdownReceiver {
5263
let (sender, receiver) = tokio::sync::oneshot::channel();
53-
self.subscribers.push(sender);
64+
self.subscribers.lock().unwrap().push(sender);
5465
ShutdownReceiver { receiver }
5566
}
5667

5768
/// Signal all receivers to shutdown
5869
pub async fn signal_shutdown(self, shutdown_type: ShutdownType) {
5970
debug!("Signaling shutdown to all subscribers");
71+
let subscribers = {
72+
let mut guard = self.subscribers.lock().unwrap();
73+
std::mem::take(&mut *guard)
74+
};
6075
let mut receivers = Vec::new();
61-
for sender in self.subscribers {
76+
for sender in subscribers {
6277
let (responder, receiver) = tokio::sync::oneshot::channel();
6378
receivers.push(receiver);
6479
match sender.send(ShutdownSignal {

0 commit comments

Comments
 (0)