Skip to content

Commit 2760b20

Browse files
dulinrileymeta-codesync[bot]
authored andcommitted
Change ActorStatus to Stopping, Stopped, and Failed only once (#1889)
Summary: Pull Request resolved: #1889 Before the "Instance::run" and "Instance::run_actor_tree" would both try to change the ActorStatus to Stopping and Stopped. Going back and forth from a terminal state could introduce timing errors on status watchers, which watch for Stopped or Failed. Just do this once in run_actor_tree once all child actors are stopped and cleanup has run. Added an assert to `change_status` to ensure this is the case. Also disallow changing from Stopped -> Failed. Actors should only have a single terminal state. Reviewed By: mariusae Differential Revision: D87082404 fbshipit-source-id: 34852a850a1a0382c6046e3f4c143481213afa21
1 parent 6757d53 commit 2760b20

File tree

2 files changed

+55
-32
lines changed

2 files changed

+55
-32
lines changed

hyperactor/src/proc.rs

Lines changed: 54 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -736,9 +736,7 @@ impl Proc {
736736
RealClock
737737
.timeout(
738738
timeout,
739-
root.wait_for(|state: &ActorStatus| {
740-
matches!(*state, ActorStatus::Stopped)
741-
}),
739+
root.wait_for(|state: &ActorStatus| state.is_terminal()),
742740
)
743741
.await
744742
.ok()
@@ -1073,6 +1071,21 @@ impl<A: Actor> Instance<A> {
10731071
#[track_caller]
10741072
fn change_status(&self, new: ActorStatus) {
10751073
let old = self.inner.status_tx.send_replace(new.clone());
1074+
// 2 cases are allowed:
1075+
// * non-terminal -> non-terminal
1076+
// * non-terminal -> terminal
1077+
// terminal -> terminal is not allowed unless it is the same status (no-op).
1078+
// terminal -> non-terminal is never allowed.
1079+
assert!(
1080+
!old.is_terminal() && !new.is_terminal()
1081+
|| !old.is_terminal() && new.is_terminal()
1082+
|| old == new,
1083+
"actor changing status illegally, only allow non-terminal -> non-terminal \
1084+
and non-terminal -> terminal statuses. actor_id={}, prev_status={}, status={}",
1085+
self.self_id(),
1086+
old,
1087+
new
1088+
);
10761089
// Actor status changes between Idle and Processing when handling every
10771090
// message. It creates too many logs if we want to log these 2 states.
10781091
// Therefore we skip the status changes between them.
@@ -1098,8 +1111,8 @@ impl<A: Actor> Instance<A> {
10981111
self.inner.status_tx.borrow().is_terminal()
10991112
}
11001113

1101-
fn is_stopped(&self) -> bool {
1102-
self.inner.status_tx.borrow().is_stopped()
1114+
fn is_stopping(&self) -> bool {
1115+
self.inner.status_tx.borrow().is_stopping()
11031116
}
11041117

11051118
/// This instance's actor ID.
@@ -1197,10 +1210,12 @@ impl<A: Actor> Instance<A> {
11971210
let result = self
11981211
.run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx)
11991212
.await;
1213+
1214+
assert!(self.is_stopping());
12001215
let event = match result {
12011216
Ok(_) => {
1202-
// actor should have been stopped by run_actor_tree
1203-
assert!(self.is_stopped());
1217+
// success exit case
1218+
self.change_status(ActorStatus::Stopped);
12041219
None
12051220
}
12061221
Err(err) => {
@@ -1296,15 +1311,10 @@ impl<A: Actor> Instance<A> {
12961311
}
12971312
};
12981313

1299-
match &result {
1300-
Ok(_) => assert!(self.is_stopped()),
1301-
Err(err) => {
1302-
tracing::error!("{}: actor failure: {}", self.self_id(), err);
1303-
assert!(!self.is_terminal());
1304-
// Send Stopping instead of Failed, because we still need to
1305-
// unlink child actors.
1306-
self.change_status(ActorStatus::Stopping);
1307-
}
1314+
assert!(!self.is_terminal());
1315+
self.change_status(ActorStatus::Stopping);
1316+
if let Err(err) = &result {
1317+
tracing::error!("{}: actor failure: {}", self.self_id(), err);
13081318
}
13091319

13101320
// After this point, we know we won't spawn any more children,
@@ -1447,7 +1457,6 @@ impl<A: Actor> Instance<A> {
14471457
}
14481458

14491459
if need_drain {
1450-
self.change_status(ActorStatus::Stopping);
14511460
let mut n = 0;
14521461
while let Ok(work) = work_rx.try_recv() {
14531462
if let Err(err) = work.handle(actor, self).await {
@@ -1461,7 +1470,6 @@ impl<A: Actor> Instance<A> {
14611470
tracing::debug!("drained {} messages", n);
14621471
}
14631472
tracing::debug!("exited actor loop: {}", self.self_id());
1464-
self.change_status(ActorStatus::Stopped);
14651473
Ok(())
14661474
}
14671475

@@ -2148,6 +2156,7 @@ mod tests {
21482156
use hyperactor_macros::export;
21492157
use maplit::hashmap;
21502158
use serde_json::json;
2159+
use timed_test::async_timed_test;
21512160
use tokio::sync::Barrier;
21522161
use tokio::sync::oneshot;
21532162
use tracing::Level;
@@ -2283,7 +2292,7 @@ mod tests {
22832292
}
22842293

22852294
#[tracing_test::traced_test]
2286-
#[tokio::test]
2295+
#[async_timed_test(timeout_secs = 30)]
22872296
async fn test_spawn_actor() {
22882297
let proc = Proc::local();
22892298
let handle = proc.spawn::<TestActor>("test", ()).await.unwrap();
@@ -2332,7 +2341,7 @@ mod tests {
23322341
assert_matches!(*state.borrow(), ActorStatus::Stopped);
23332342
}
23342343

2335-
#[tokio::test]
2344+
#[async_timed_test(timeout_secs = 30)]
23362345
async fn test_proc_actors_messaging() {
23372346
let proc = Proc::local();
23382347
let first = proc.spawn::<TestActor>("first", ()).await.unwrap();
@@ -2365,7 +2374,7 @@ mod tests {
23652374
}
23662375
}
23672376

2368-
#[tokio::test]
2377+
#[async_timed_test(timeout_secs = 30)]
23692378
async fn test_actor_lookup() {
23702379
let proc = Proc::local();
23712380
let (client, _handle) = proc.instance("client").unwrap();
@@ -2426,7 +2435,7 @@ mod tests {
24262435
}
24272436

24282437
#[tracing_test::traced_test]
2429-
#[tokio::test]
2438+
#[async_timed_test(timeout_secs = 30)]
24302439
async fn test_spawn_child() {
24312440
let proc = Proc::local();
24322441

@@ -2474,17 +2483,27 @@ mod tests {
24742483
assert!(first.cell().inner.parent.upgrade().is_none());
24752484

24762485
// Supervision tree is torn down correctly.
2486+
// Once each actor is stopped, it should have no linked children.
2487+
let third_cell = third.cell().clone();
24772488
third.drain_and_stop().unwrap();
24782489
third.await;
2479-
assert!(second.cell().inner.children.is_empty());
2490+
assert!(third_cell.inner.children.is_empty());
2491+
drop(third_cell);
24802492
validate_link(second.cell(), first.cell());
24812493

2494+
let second_cell = second.cell().clone();
24822495
second.drain_and_stop().unwrap();
24832496
second.await;
2484-
assert!(first.cell().inner.children.is_empty());
2497+
assert!(second_cell.inner.children.is_empty());
2498+
drop(second_cell);
2499+
2500+
let first_cell = first.cell().clone();
2501+
first.drain_and_stop().unwrap();
2502+
first.await;
2503+
assert!(first_cell.inner.children.is_empty());
24852504
}
24862505

2487-
#[tokio::test]
2506+
#[async_timed_test(timeout_secs = 30)]
24882507
async fn test_child_lifecycle() {
24892508
let proc = Proc::local();
24902509

@@ -2502,7 +2521,7 @@ mod tests {
25022521
}
25032522
}
25042523

2505-
#[tokio::test]
2524+
#[async_timed_test(timeout_secs = 30)]
25062525
async fn test_parent_failure() {
25072526
let proc = Proc::local();
25082527
// Need to set a supervison coordinator for this Proc because there will
@@ -2536,7 +2555,7 @@ mod tests {
25362555
assert_eq!(root_1.await, ActorStatus::Stopped);
25372556
}
25382557

2539-
#[tokio::test]
2558+
#[async_timed_test(timeout_secs = 30)]
25402559
async fn test_actor_ledger() {
25412560
async fn wait_until_idle(actor_handle: &ActorHandle<TestActor>) {
25422561
actor_handle
@@ -2600,6 +2619,8 @@ mod tests {
26002619

26012620
let root_1 = TestActor::spawn_child(&root).await;
26022621
wait_until_idle(&root_1).await;
2622+
// Wait until the root actor processes the message and is then idle again.
2623+
wait_until_idle(&root).await;
26032624
{
26042625
let snapshot = proc.state().ledger.snapshot();
26052626
assert_eq!(
@@ -2627,6 +2648,7 @@ mod tests {
26272648

26282649
let root_1_1 = TestActor::spawn_child(&root_1).await;
26292650
wait_until_idle(&root_1_1).await;
2651+
wait_until_idle(&root_1).await;
26302652
{
26312653
let snapshot = proc.state().ledger.snapshot();
26322654
assert_eq!(
@@ -2666,6 +2688,7 @@ mod tests {
26662688

26672689
let root_2 = TestActor::spawn_child(&root).await;
26682690
wait_until_idle(&root_2).await;
2691+
wait_until_idle(&root).await;
26692692
{
26702693
let snapshot = proc.state().ledger.snapshot();
26712694
assert_eq!(
@@ -2757,7 +2780,7 @@ mod tests {
27572780
}
27582781
}
27592782

2760-
#[tokio::test]
2783+
#[async_timed_test(timeout_secs = 30)]
27612784
async fn test_multi_handler() {
27622785
// TEMPORARY: This test is currently a bit awkward since we don't yet expose
27632786
// public interfaces to multi-handlers. This will be fixed shortly.
@@ -2816,7 +2839,7 @@ mod tests {
28162839
assert_eq!(state.load(Ordering::SeqCst), 123);
28172840
}
28182841

2819-
#[tokio::test]
2842+
#[async_timed_test(timeout_secs = 30)]
28202843
async fn test_actor_panic() {
28212844
// Need this custom hook to store panic backtrace in task_local.
28222845
panic_handler::set_panic_hook();
@@ -2849,7 +2872,7 @@ mod tests {
28492872
}
28502873
}
28512874

2852-
#[tokio::test]
2875+
#[async_timed_test(timeout_secs = 30)]
28532876
async fn test_local_supervision_propagation() {
28542877
#[derive(Debug)]
28552878
struct TestActor(Arc<AtomicBool>, bool);
@@ -2960,7 +2983,7 @@ mod tests {
29602983
);
29612984
}
29622985

2963-
#[tokio::test]
2986+
#[async_timed_test(timeout_secs = 30)]
29642987
async fn test_instance() {
29652988
#[derive(Debug, Default, Actor)]
29662989
struct TestActor;

hyperactor_mesh/src/proc_mesh/mesh_agent.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ impl MeshAgentMessageHandler for ProcMeshAgent {
401401
match RealClock
402402
.timeout(
403403
tokio::time::Duration::from_millis(timeout_ms),
404-
status.wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Stopped)),
404+
status.wait_for(|state: &ActorStatus| state.is_terminal()),
405405
)
406406
.await
407407
{

0 commit comments

Comments
 (0)