Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions engine/packages/gasoline/src/db/kv/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,14 +660,14 @@ impl DatabaseDebug for DatabaseKv {
let output_subspace = self.subspace.subspace(&output_key);

let (
workflow_name,
workflow_name_entry,
is_running,
has_wake_condition,
is_silenced,
has_output,
error,
) = tokio::try_join!(
tx.read(&name_key, Serializable),
tx.read_opt(&name_key, Serializable),
tx.exists(&worker_id_key, Serializable),
tx.exists(&has_wake_condition_key, Serializable),
tx.exists(&silence_ts_key, Serializable),
Expand All @@ -688,6 +688,11 @@ impl DatabaseDebug for DatabaseKv {
tx.read_opt(&error_key, Serializable),
)?;

let Some(workflow_name) = workflow_name_entry else {
tracing::warn!("workflow {workflow_id} does not exist");
continue;
};

if is_running || is_silenced {
continue;
}
Expand Down
70 changes: 35 additions & 35 deletions engine/packages/pegboard/src/workflows/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,32 +877,46 @@ async fn handle_stopped(
tracing::debug!(?variant, "actor stopped");

let force_reschedule = match &variant {
StoppedVariant::Normal { code, .. } => {
StoppedVariant::Normal {
code: protocol::mk2::StopCode::Ok,
..
} => {
// Reset retry count on successful exit
if let protocol::mk2::StopCode::Ok = code {
state.reschedule_state = Default::default();
}
state.reschedule_state = Default::default();

false
}
StoppedVariant::Normal {
code: protocol::mk2::StopCode::Error,
message,
} => {
ctx.v(3)
.activity(runtime::SetFailureReasonInput {
failure_reason: FailureReason::Crashed {
message: message.clone(),
},
})
.await?;

false
}
StoppedVariant::Lost {
force_reschedule, ..
} => *force_reschedule,
};
force_reschedule,
failure_reason,
} => {
// Set runner failure reason if actor was lost unexpectedly.
// This is set early (before crash policy handling) because it applies to all crash policies.
if let Some(failure_reason) = &failure_reason {
ctx.v(3)
.activity(runtime::SetFailureReasonInput {
failure_reason: failure_reason.clone(),
})
.await?;
}

// Set runner failure reason if actor was lost unexpectedly.
// This is set early (before crash policy handling) because it applies to all crash policies.
if let StoppedVariant::Lost {
failure_reason: Some(failure_reason),
..
} = &variant
{
ctx.v(3)
.activity(runtime::SetFailureReasonInput {
failure_reason: failure_reason.clone(),
})
.await?;
}
*force_reschedule
}
};

// Clear stop gc timeout to prevent being marked as lost in the lifecycle loop
state.gc_timeout_ts = None;
Expand Down Expand Up @@ -1056,21 +1070,7 @@ async fn handle_stopped(

state.sleeping = true;

// Set Crashed failure reason for actual crashes.
// Runner failure reasons are already set at the start of handle_stopped.
if let StoppedVariant::Normal {
code: protocol::mk2::StopCode::Error,
message,
} = &variant
{
ctx.v(3)
.activity(runtime::SetFailureReasonInput {
failure_reason: FailureReason::Crashed {
message: message.clone(),
},
})
.await?;
}
ctx.removed::<Activity<runtime::SetFailureReason>>().await?;

ctx.activity(runtime::SetSleepingInput {
actor_id: input.actor_id,
Expand Down
1 change: 0 additions & 1 deletion engine/packages/pegboard/src/workflows/actor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1128,7 +1128,6 @@ pub async fn set_started(ctx: &ActivityCtx, input: &SetStartedInput) -> Result<(
state.start_ts = Some(util::timestamp::now());
}
state.connectable_ts = Some(util::timestamp::now());
state.failure_reason = None;

ctx.udb()?
.run(|tx| async move {
Expand Down
Loading