-
Notifications
You must be signed in to change notification settings - Fork 129
Description
Problem
When a Restate node restarts or gains new partition leaders, all pending invocations are re-invoked simultaneously, causing memory ballooning. The invoker has no global memory budget — it blindly loads journal entries, state, and notifications into memory and pushes data through unbounded channels without accounting.
There are two data paths that consume memory:
- Outbound path (RocksDB → invoker → hyper → service deployment): Journal entries, state entries, and notifications are loaded from storage and sent to the deployment.
- Inbound path (service deployment → hyper → decoder → invoker → Bifrost): Journal entries/commands from the SDK are received, decoded, and proposed to Bifrost as Effects.
Both paths need proper memory accounting with RAII-based memory leases that are only released when the memory is actually freed (e.g., when hyper consumes a frame or when an Effect is proposed to Bifrost).
Part of #4311.
Solution Overview
Introduce a single global MemoryPool for the invoker with per-invocation two-directional memory budgets (DirectionalBudget) that provide isolation between inbound and outbound data flows. Memory is tracked via RAII InvocationMemoryLease types that reclaim to the originating budget on drop.
Key Design Principles
- Never materialize data without a lease — use two-phase reads (peek at size via RocksDB pinnable slice → acquire lease → deserialize) from storage; check message header before reading inbound body.
- Per-invocation
DirectionalBudget— separate inbound and outbound sub-pools prevent one direction from starving the other, eliminating bidirectional deadlocks. - Smart wait-vs-yield when memory is unavailable:
- Wait if enough in-flight memory (being sent to deployment / replicated to Bifrost) will free up soon.
- Yield if fundamentally insufficient — return the invocation to the scheduler for re-evaluation.
- No deadlocks — separate inbound/outbound budgets, pre-read memory checks, and yield-on-exhaustion guarantee progress.
Core Types
DirectionalBudget
A reusable type parameterized at construction for either inbound or outbound use:
pub struct DirectionalBudget {
global_pool: MemoryPool, // shared invoker pool
local_available: AtomicUsize, // reclaimed + initial reservation, ready for reuse
in_flight: AtomicUsize, // split off into leases, will be reclaimed on drop
min_reserved: usize, // floor: never return below this to global pool
upper_bound: usize, // ceiling: max (local_available + in_flight)
notify: Notify, // wakes waiters when memory is reclaimed
}| Direction | min_reserved |
upper_bound |
|---|---|---|
| Inbound | 64 KiB (≈ 2 HTTP/2 frames) | max_message_size |
| Outbound | 0 | max_message_size |
Key operations:
try_acquire(size) -> Option<InvocationMemoryLease>— try local first, then global pool. Fails if would exceedupper_boundor global pool is empty.ensure_available(size).await— waits onnotifyuntillocal_available >= size. Used for inbound pre-read check.reclaim(size)— called byInvocationMemoryLease::drop(). Adds tolocal_available, wakes waiters.release_excess()— returnslocal_available - min_reservedback to global pool. Called periodically.should_yield(needed) -> bool— returnstrueifneeded > local_available + in_flight + global_pool.available().- Drop behavior: returns all
local_availableto the global pool when the invocation ends.
InvocationBudget
Groups both directions. Arc-wrapped for sharing between the invocation task and hyper's task.
pub struct InvocationBudget {
pub inbound: DirectionalBudget,
pub outbound: DirectionalBudget,
}InvocationMemoryLease
RAII guard. Reclaims to whichever DirectionalBudget it was allocated from on drop.
pub struct InvocationMemoryLease {
budget: Arc<DirectionalBudget>,
size: usize,
}Data Carriers
BytesWithLease(outbound): WrapsBytes+InvocationMemoryLeasefrom outbound budget. When hyper consumes the frame and drops it, memory reclaims tooutbound.local_available.EffectWithLease(inbound): WrapsBox<Effect>+InvocationMemoryLeasefrom inbound budget. When dropped after Bifrost proposal, memory reclaims toinbound.local_available.
Data Flow
Outbound Path
RocksDB iterator positions on entry
→ peek at pinned value size (v.len())
→ outbound_budget.try_acquire(size)
→ success: deserialize from pinned slice, release pin
→ fail + should_yield == false: wait (in-flight will free up)
→ fail + should_yield == true: yield invocation
→ encode into Bytes → wrap in BytesWithLease
→ send via unbounded http_stream_tx (no await)
→ hyper consumes → BytesWithLease dropped → outbound_budget.reclaim()
Inbound Path
Before polling HTTP response stream:
→ inbound_budget.ensure_available(http2_frame_size).await
(select! keeps outbound/timeout arms active while waiting)
→ poll HTTP response → push to decoder
→ decoder parses 8-byte header → pending_body_bytes() = message length
→ inbound_budget.try_acquire(message_length)
→ success: read body, decode message
→ fail: yield invocation (before reading body bytes)
→ process message → wrap in EffectWithLease
→ send via unbounded output_tx (no await)
→ leader_state → propose() → EffectWithLease dropped → inbound_budget.reclaim()
Segment Queue Dequeue Guard
Memory lease must be acquired before popping from the segment queue (avoids TOCTOU race):
- Try to acquire
min_invocation_budgetfrom the global pool (stored aspending_memory_lease) - Segment queue arm guard checks
pending_memory_lease.is_some() - On pop, take the lease and pass it to
InvocationBudgetconstructor as the inboundmin_reservedseed
Channel Changes
| Channel | Current | New | Reason |
|---|---|---|---|
http_stream_tx |
mpsc::channel(1) |
unbounded | Memory accounting replaces backpressure. Eliminates await. |
output_tx |
bounded Sender<Box<Effect>> |
unbounded Sender<EffectWithLease> |
Memory accounting replaces backpressure. Eliminates await in select! |
notifications_tx |
unbounded (data) | unbounded (signals only) | Carries entry indexes, not payloads |
Yield Mechanism
Wait: should_yield(needed) returns false — enough in-flight memory will be reclaimed. Await on budget.notify.
Yield: should_yield(needed) returns true — fundamentally insufficient memory.
- With vqueues:
EffectKind::Yield→ PP callsyield_running()→ entry re-enters inbox at highest priority (TokenHeld) - Without vqueues:
EffectKind::Suspended/SuspendedV2with empty waiting set → immediate re-invocation viaSegmentQueue
Deadlock Freedom
| Scenario | Mitigation |
|---|---|
| Outbound exhausted, need to read inbound | Separate inbound budget with guaranteed reservation |
| Inbound local depleted (in-flight to Bifrost) | ensure_available().await waits for reclaim; outbound select! arms stay active |
| Inbound message too large, global pool exhausted | Yield before reading body bytes; connection dropped; all memory freed |
| Bidirectional stall (both sides sending) | Independent budgets; await inbound reclaim while outbound continues; yield breaks stall |
| Global pool consumed, can't start new invocations | Dequeue guard prevents spawning; yielded invocations free budgets |
Prerequisites / Simplifications
Remove CachedJournal
The InvokeInputJournal::CachedJournal variant passes journal entries from the PP to the invoker in-memory. This complicates memory ownership. Removing it ensures all data is loaded from RocksDB within the invoker's memory budget. The performance impact is negligible (entry is in memtable/block cache).
Convert Notifications/Completions to Signal-Only
Completions (v1) and notifications (v2) carry full data payloads through unbounded channels. Since both are already persisted in RocksDB before forwarding, replace the data with signals (entry indexes). The invocation task reads from RocksDB on-demand using non-transactional point reads (via a cloned InvocationReader).
Memory-Aware Journal Stream
Modify JournalEntryIter to integrate with the outbound budget: position RocksDB iterator → peek v.len() via pinnable slice → acquire memory → deserialize → advance. The stream becomes async and yields Poll::Pending when waiting for memory.
Memory-Aware Inbound Decoder
Add pending_body_bytes() -> Option<usize> to both V1-V3 and V4 decoders (expose the WaitingPayload(header) state). The DecoderStream checks inbound memory before reading the message body.
PR Structure
PR 1: Remove CachedJournal + Foundational API Changes
Remove InvokeInputJournal, add non-transactional read_journal_entry() to InvocationReader trait.
PR 2: Convert Notifications/Completions to Signal-Only
Notification carries entry indexes only. Clone reader for bidi-phase point reads.
PR 3: MemoryPool + InvocationBudget + Outbound Path
Core types, memory-aware journal/state streams, BytesWithLease, unbounded http_stream_tx, HTTP/2 window config (32 KiB default), segment queue dequeue guard with lease.
PR 4: Inbound Path + Lease Threading to Bifrost
pending_body_bytes() on decoders, EffectWithLease, unbounded output_tx, inbound pre-read guard.
PR 5: Smart Wait/Yield
should_yield(), vqueue yield, suspension-based yield, memory requirement hints.
Configuration
| Config Key | Default | Description |
|---|---|---|
worker.invoker.memory-limit |
128-256 MiB | Global invoker memory pool capacity |
worker.invoker.per-invocation-upper-bound |
max_message_size |
Max memory per invocation per direction |
worker.invoker.inbound-min-reserved |
64 KiB | Guaranteed inbound reservation per invocation |
http.http2-initial-stream-window-size |
32 KiB | HTTP/2 per-stream receive window |
References
- Invoker memory management: Prevent memory ballooning during node restart/leader failover #4311 — Parent issue: Invoker memory management
- Add configurable eager state size limit to mitigate memory pressure #4345 — Eager state has no size limit
- Make journal and state reading lazy in the invoker via GATs #4346 — Lazy journal/state reading (merged)
- Remove high-water-mark arena from protocol Encoders #4365 — Remove encoder arena high-water-mark (merged)
- Make InvokerOptions.in_memory_queue_length_limit respect global memory limit of a node #2402 — Memory-aware invocation scheduling
The full detailed plan with file-level references is maintained in context/4354-invoker-memory-budget.md.