Skip to content

Commit 33e8961

Browse files
committed
fix(pb): have restart policy actors sleep after pending for too long
1 parent 9e4ae6f commit 33e8961

File tree

2 files changed

+27
-13
lines changed

2 files changed

+27
-13
lines changed

engine/packages/pegboard/src/workflows/actor/mod.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ pub use runtime::AllocationOverride;
1414

1515
/// Batch size of how many events to ack.
1616
const EVENT_ACK_BATCH_SIZE: i64 = 250;
17+
/// How long an actor with crash_policy Restart should wait pending before setting itself to sleep.
18+
const RESTART_PENDING_TIMEOUT_MS: i64 = util::duration::seconds(60);
1719

1820
#[derive(Clone, Debug, Serialize, Deserialize, Hash)]
1921
pub struct Input {
@@ -1053,13 +1055,19 @@ async fn handle_stopped(
10531055

10541056
match (input.crash_policy, graceful_exit) {
10551057
(CrashPolicy::Restart, false) => {
1056-
match runtime::reschedule_actor(ctx, &input, state, AllocationOverride::None)
1057-
.await?
1058+
match runtime::reschedule_actor(
1059+
ctx,
1060+
&input,
1061+
state,
1062+
AllocationOverride::PendingTimeout {
1063+
pending_timeout: RESTART_PENDING_TIMEOUT_MS,
1064+
},
1065+
)
1066+
.await?
10581067
{
10591068
runtime::SpawnActorOutput::Allocated { .. } => {}
1060-
// NOTE: Its not possible for `SpawnActorOutput::Sleep` to be returned here, the crash
1061-
// policy is `Restart`.
1062-
runtime::SpawnActorOutput::Sleep | runtime::SpawnActorOutput::Destroy => {
1069+
runtime::SpawnActorOutput::Sleep => {}
1070+
runtime::SpawnActorOutput::Destroy => {
10631071
// Destroyed early
10641072
return Ok(StoppedResult::Destroy);
10651073
}

engine/packages/pegboard/src/workflows/actor/runtime.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,9 @@ pub enum AllocationOverride {
585585
None,
586586
/// Forces actors with CrashPolicy::Sleep to pend instead of sleep.
587587
DontSleep { pending_timeout: Option<i64> },
588+
/// If an allocation results in pending, it will be put to sleep if it is not allocated after this
589+
/// timeout.
590+
PendingTimeout { pending_timeout: i64 },
588591
}
589592

590593
#[derive(Debug)]
@@ -760,14 +763,17 @@ pub async fn spawn_actor(
760763
}
761764
}
762765

763-
let signal = if let AllocationOverride::DontSleep {
764-
pending_timeout: Some(timeout),
765-
} = allocation_override
766-
{
767-
ctx.listen_with_timeout::<PendingAllocation>(timeout)
768-
.await?
769-
} else {
770-
Some(ctx.listen::<PendingAllocation>().await?)
766+
let signal = match allocation_override {
767+
AllocationOverride::DontSleep {
768+
pending_timeout: Some(timeout),
769+
}
770+
| AllocationOverride::PendingTimeout {
771+
pending_timeout: timeout,
772+
} => {
773+
ctx.listen_with_timeout::<PendingAllocation>(timeout)
774+
.await?
775+
}
776+
_ => Some(ctx.listen::<PendingAllocation>().await?),
771777
};
772778

773779
// If allocation fails, the allocate txn already inserted this actor into the queue. Now we wait for

0 commit comments

Comments
 (0)