Skip to content

Commit 980dd0a

Browse files
committed
fix: set failure reason regardless of crash policy
1 parent 3da23f7 commit 980dd0a

File tree

3 files changed

+42
-38
lines changed

3 files changed

+42
-38
lines changed

engine/packages/gasoline/src/db/kv/debug.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -660,14 +660,14 @@ impl DatabaseDebug for DatabaseKv {
660660
let output_subspace = self.subspace.subspace(&output_key);
661661

662662
let (
663-
workflow_name,
663+
workflow_name_entry,
664664
is_running,
665665
has_wake_condition,
666666
is_silenced,
667667
has_output,
668668
error,
669669
) = tokio::try_join!(
670-
tx.read(&name_key, Serializable),
670+
tx.read_opt(&name_key, Serializable),
671671
tx.exists(&worker_id_key, Serializable),
672672
tx.exists(&has_wake_condition_key, Serializable),
673673
tx.exists(&silence_ts_key, Serializable),
@@ -688,6 +688,11 @@ impl DatabaseDebug for DatabaseKv {
688688
tx.read_opt(&error_key, Serializable),
689689
)?;
690690

691+
let Some(workflow_name) = workflow_name_entry else {
692+
tracing::warn!("workflow {workflow_id} does not exist");
693+
continue;
694+
};
695+
691696
if is_running || is_silenced {
692697
continue;
693698
}

engine/packages/pegboard/src/workflows/actor/mod.rs

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -877,32 +877,46 @@ async fn handle_stopped(
877877
tracing::debug!(?variant, "actor stopped");
878878

879879
let force_reschedule = match &variant {
880-
StoppedVariant::Normal { code, .. } => {
880+
StoppedVariant::Normal {
881+
code: protocol::mk2::StopCode::Ok,
882+
..
883+
} => {
881884
// Reset retry count on successful exit
882-
if let protocol::mk2::StopCode::Ok = code {
883-
state.reschedule_state = Default::default();
884-
}
885+
state.reschedule_state = Default::default();
886+
887+
false
888+
}
889+
StoppedVariant::Normal {
890+
code: protocol::mk2::StopCode::Error,
891+
message,
892+
} => {
893+
ctx.v(3)
894+
.activity(runtime::SetFailureReasonInput {
895+
failure_reason: FailureReason::Crashed {
896+
message: message.clone(),
897+
},
898+
})
899+
.await?;
885900

886901
false
887902
}
888903
StoppedVariant::Lost {
889-
force_reschedule, ..
890-
} => *force_reschedule,
891-
};
904+
force_reschedule,
905+
failure_reason,
906+
} => {
907+
// Set runner failure reason if actor was lost unexpectedly.
908+
// This is set early (before crash policy handling) because it applies to all crash policies.
909+
if let Some(failure_reason) = &failure_reason {
910+
ctx.v(3)
911+
.activity(runtime::SetFailureReasonInput {
912+
failure_reason: failure_reason.clone(),
913+
})
914+
.await?;
915+
}
892916

893-
// Set runner failure reason if actor was lost unexpectedly.
894-
// This is set early (before crash policy handling) because it applies to all crash policies.
895-
if let StoppedVariant::Lost {
896-
failure_reason: Some(failure_reason),
897-
..
898-
} = &variant
899-
{
900-
ctx.v(3)
901-
.activity(runtime::SetFailureReasonInput {
902-
failure_reason: failure_reason.clone(),
903-
})
904-
.await?;
905-
}
917+
*force_reschedule
918+
}
919+
};
906920

907921
// Clear stop gc timeout to prevent being marked as lost in the lifecycle loop
908922
state.gc_timeout_ts = None;
@@ -1056,21 +1070,7 @@ async fn handle_stopped(
10561070

10571071
state.sleeping = true;
10581072

1059-
// Set Crashed failure reason for actual crashes.
1060-
// Runner failure reasons are already set at the start of handle_stopped.
1061-
if let StoppedVariant::Normal {
1062-
code: protocol::mk2::StopCode::Error,
1063-
message,
1064-
} = &variant
1065-
{
1066-
ctx.v(3)
1067-
.activity(runtime::SetFailureReasonInput {
1068-
failure_reason: FailureReason::Crashed {
1069-
message: message.clone(),
1070-
},
1071-
})
1072-
.await?;
1073-
}
1073+
ctx.removed::<Activity<runtime::SetFailureReason>>().await?;
10741074

10751075
ctx.activity(runtime::SetSleepingInput {
10761076
actor_id: input.actor_id,

engine/packages/pegboard/src/workflows/actor/runtime.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1128,7 +1128,6 @@ pub async fn set_started(ctx: &ActivityCtx, input: &SetStartedInput) -> Result<(
11281128
state.start_ts = Some(util::timestamp::now());
11291129
}
11301130
state.connectable_ts = Some(util::timestamp::now());
1131-
state.failure_reason = None;
11321131

11331132
ctx.udb()?
11341133
.run(|tx| async move {

0 commit comments

Comments
 (0)