diff --git a/engine/packages/gasoline/src/db/kv/debug.rs b/engine/packages/gasoline/src/db/kv/debug.rs index b4b214cccf..885b6fa545 100644 --- a/engine/packages/gasoline/src/db/kv/debug.rs +++ b/engine/packages/gasoline/src/db/kv/debug.rs @@ -660,14 +660,14 @@ impl DatabaseDebug for DatabaseKv { let output_subspace = self.subspace.subspace(&output_key); let ( - workflow_name, + workflow_name_entry, is_running, has_wake_condition, is_silenced, has_output, error, ) = tokio::try_join!( - tx.read(&name_key, Serializable), + tx.read_opt(&name_key, Serializable), tx.exists(&worker_id_key, Serializable), tx.exists(&has_wake_condition_key, Serializable), tx.exists(&silence_ts_key, Serializable), @@ -688,6 +688,11 @@ impl DatabaseDebug for DatabaseKv { tx.read_opt(&error_key, Serializable), )?; + let Some(workflow_name) = workflow_name_entry else { + tracing::warn!("workflow {workflow_id} does not exist"); + continue; + }; + if is_running || is_silenced { continue; } diff --git a/engine/packages/pegboard/src/workflows/actor/mod.rs b/engine/packages/pegboard/src/workflows/actor/mod.rs index f1ed5da03a..193feb3faf 100644 --- a/engine/packages/pegboard/src/workflows/actor/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor/mod.rs @@ -877,32 +877,46 @@ async fn handle_stopped( tracing::debug!(?variant, "actor stopped"); let force_reschedule = match &variant { - StoppedVariant::Normal { code, .. } => { + StoppedVariant::Normal { + code: protocol::mk2::StopCode::Ok, + .. + } => { // Reset retry count on successful exit - if let protocol::mk2::StopCode::Ok = code { - state.reschedule_state = Default::default(); - } + state.reschedule_state = Default::default(); + + false + } + StoppedVariant::Normal { + code: protocol::mk2::StopCode::Error, + message, + } => { + ctx.v(3) + .activity(runtime::SetFailureReasonInput { + failure_reason: FailureReason::Crashed { + message: message.clone(), + }, + }) + .await?; false } StoppedVariant::Lost { - force_reschedule, .. - } => *force_reschedule, - }; + force_reschedule, + failure_reason, + } => { + // Set runner failure reason if actor was lost unexpectedly. + // This is set early (before crash policy handling) because it applies to all crash policies. + if let Some(failure_reason) = &failure_reason { + ctx.v(3) + .activity(runtime::SetFailureReasonInput { + failure_reason: failure_reason.clone(), + }) + .await?; + } - // Set runner failure reason if actor was lost unexpectedly. - // This is set early (before crash policy handling) because it applies to all crash policies. - if let StoppedVariant::Lost { - failure_reason: Some(failure_reason), - .. - } = &variant - { - ctx.v(3) - .activity(runtime::SetFailureReasonInput { - failure_reason: failure_reason.clone(), - }) - .await?; - } + *force_reschedule + } + }; // Clear stop gc timeout to prevent being marked as lost in the lifecycle loop state.gc_timeout_ts = None; @@ -1056,21 +1070,7 @@ async fn handle_stopped( state.sleeping = true; - // Set Crashed failure reason for actual crashes. - // Runner failure reasons are already set at the start of handle_stopped. - if let StoppedVariant::Normal { - code: protocol::mk2::StopCode::Error, - message, - } = &variant - { - ctx.v(3) - .activity(runtime::SetFailureReasonInput { - failure_reason: FailureReason::Crashed { - message: message.clone(), - }, - }) - .await?; - } + ctx.removed::>().await?; ctx.activity(runtime::SetSleepingInput { actor_id: input.actor_id, diff --git a/engine/packages/pegboard/src/workflows/actor/runtime.rs b/engine/packages/pegboard/src/workflows/actor/runtime.rs index 6417d442a5..31abf9998d 100644 --- a/engine/packages/pegboard/src/workflows/actor/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor/runtime.rs @@ -1128,7 +1128,6 @@ pub async fn set_started(ctx: &ActivityCtx, input: &SetStartedInput) -> Result<( state.start_ts = Some(util::timestamp::now()); } state.connectable_ts = Some(util::timestamp::now()); - state.failure_reason = None; ctx.udb()? .run(|tx| async move {