diff --git a/engine/packages/pegboard/src/workflows/actor/mod.rs b/engine/packages/pegboard/src/workflows/actor/mod.rs index 193feb3faf..8951bc48d7 100644 --- a/engine/packages/pegboard/src/workflows/actor/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor/mod.rs @@ -14,6 +14,8 @@ pub use runtime::AllocationOverride; /// Batch size of how many events to ack. const EVENT_ACK_BATCH_SIZE: i64 = 250; +/// How long an actor with crash_policy Restart should wait pending before setting itself to sleep. +const RESTART_PENDING_TIMEOUT_MS: i64 = util::duration::seconds(60); #[derive(Clone, Debug, Serialize, Deserialize, Hash)] pub struct Input { @@ -1053,13 +1055,19 @@ async fn handle_stopped( match (input.crash_policy, graceful_exit) { (CrashPolicy::Restart, false) => { - match runtime::reschedule_actor(ctx, &input, state, AllocationOverride::None) - .await? + match runtime::reschedule_actor( + ctx, + &input, + state, + AllocationOverride::PendingTimeout { + pending_timeout: RESTART_PENDING_TIMEOUT_MS, + }, + ) + .await? { runtime::SpawnActorOutput::Allocated { .. } => {} - // NOTE: Its not possible for `SpawnActorOutput::Sleep` to be returned here, the crash - // policy is `Restart`. - runtime::SpawnActorOutput::Sleep | runtime::SpawnActorOutput::Destroy => { + runtime::SpawnActorOutput::Sleep => {} + runtime::SpawnActorOutput::Destroy => { // Destroyed early return Ok(StoppedResult::Destroy); } diff --git a/engine/packages/pegboard/src/workflows/actor/runtime.rs b/engine/packages/pegboard/src/workflows/actor/runtime.rs index 31abf9998d..e3337219d2 100644 --- a/engine/packages/pegboard/src/workflows/actor/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor/runtime.rs @@ -585,6 +585,9 @@ pub enum AllocationOverride { None, /// Forces actors with CrashPolicy::Sleep to pend instead of sleep. DontSleep { pending_timeout: Option }, + /// If an allocation results in pending, it will be put to sleep if it is not allocated after this + /// timeout. + PendingTimeout { pending_timeout: i64 }, } #[derive(Debug)] @@ -760,14 +763,17 @@ pub async fn spawn_actor( } } - let signal = if let AllocationOverride::DontSleep { - pending_timeout: Some(timeout), - } = allocation_override - { - ctx.listen_with_timeout::(timeout) - .await? - } else { - Some(ctx.listen::().await?) + let signal = match allocation_override { + AllocationOverride::DontSleep { + pending_timeout: Some(timeout), + } + | AllocationOverride::PendingTimeout { + pending_timeout: timeout, + } => { + ctx.listen_with_timeout::(timeout) + .await? + } + _ => Some(ctx.listen::().await?), }; // If allocation fails, the allocate txn already inserted this actor into the queue. Now we wait for