Skip to content

feat(core): Add destination-controlled LSN flushing#626

Closed
flak153 wants to merge 1 commit intosupabase:mainfrom
flak153:destination-controlled-lsn-flushing
Closed

feat(core): Add destination-controlled LSN flushing#626
flak153 wants to merge 1 commit intosupabase:mainfrom
flak153:destination-controlled-lsn-flushing

Conversation

@flak153
Copy link
Copy Markdown

@flak153 flak153 commented Mar 7, 2026

Summary

Adds confirmed_flush_lsn() to the Destination trait so that asynchronous destinations can control exactly when flush_lsn is reported to PostgreSQL.

This is needed for destinations that cannot confirm data as durably processed at the moment write_events() returns — for example, a destination that buffers writes and only confirms after an external checkpoint. Without this, the apply loop auto-advances flush_lsn immediately after write_events(), which tells PostgreSQL it's safe to discard WAL that the destination hasn't actually persisted yet.

Problem

The current apply loop has a single LSN advancement path: after every batch flush, last_flush_lsn is unconditionally advanced to last_commit_end_lsn. This works for synchronous destinations where write_events() returning means the data is durable. But for asynchronous destinations:

  1. Data loss on restart: If the destination crashes between receiving events and confirming them, PostgreSQL has already discarded the WAL because flush_lsn was advanced.
  2. WAL buildup during idle: Without destination awareness, the system can't distinguish "idle because no new data" from "idle because destination is still processing." Keepalive responses advance flush_lsn to last_received_lsn regardless, which is incorrect when the destination has in-flight writes.

Approach

No tracking needed

The value chain is structurally guaranteed to be a commit boundary: CommitEvent.end_lsn flows through write_events() → destination records it → confirmed_flush_lsn() returns it. The value is never transformed, so there's nothing to validate against a tracking set. Tracking pending commit LSNs would also be expensive at high throughput — at 100K TPS with lag, it could hold millions of entries.

Instead, the contract is enforced through:

  • Documentation: The trait doc explicitly states the LSN must be a CommitEvent.end_lsn previously delivered via write_events().
  • debug_assert!: In development builds, poll_destination_flush() asserts confirmed_lsn <= last_received_lsn. This catches violations during testing.
  • std::cmp::min cap: In release builds, confirmed LSN is capped at last_received_lsn as a safety net, preventing any possible over-advance.

API design

fn confirmed_flush_lsn(&self) -> Option<(PgLsn, bool)> { None }
  • Returns None by default → zero behavioral change for existing destinations (BigQuery, Iceberg, etc.)
  • PgLsn: The confirmed flush position. Must be a CommitEvent.end_lsn value.
  • bool (has_inflight_writes): Distinguishes "idle, nothing pending" (false) from "idle, still processing" (true). This matters for effective_flush_lsn() — when idle with no inflight writes, it's safe to advance flush_lsn to last_received_lsn (preventing WAL buildup). When idle with inflight writes, flush_lsn stays at the last confirmed position.
  • Synchronous method (not async) because it's called from the keepalive handler path where the destination should just read from an atomic/mutex, not do I/O.

Where poll_destination_flush() is called

Three call sites, each serving a different purpose:

  1. process_syncing_tables_after_batch_flush() — After every batch of events is written to the destination. This is the primary advancement path during active streaming.
  2. Keepalive handler — During idle periods when only keepalives arrive. Without this, last_flush_lsn would stall even though the destination may have confirmed new progress. This also feeds into the existing process_syncing_tables_when_idle() call at line 892 of handle_replication_message_and_flush(), making table state transitions reactive during idle.
  3. initiate_graceful_shutdown() — Final poll before shutdown to capture any last confirmed progress, ensuring the shutdown status update reports the most accurate flush position.

effective_flush_lsn() moved from ApplyLoopState to ApplyLoop

Previously lived on ApplyLoopState and had no destination access — it just returned last_received_lsn when idle. Now it needs to check self.destination.confirmed_flush_lsn() to decide whether it's safe to advance, so it was moved to ApplyLoop<S, D>. The old method on ApplyLoopState was removed (it was the only dead code warning).

Conditional auto-advance in process_syncing_tables_after_batch_flush()

For legacy destinations (confirmed_flush_lsn() returns None), the existing behavior is preserved: last_flush_lsn is auto-advanced to last_commit_end_lsn after each batch flush.

For controlled destinations (Some), auto-advance is skipped. poll_destination_flush() (called at the top of the method) handles advancement to whatever position the destination has confirmed. This means last_flush_lsn may lag behind last_commit_end_lsn, which is exactly the point — the destination hasn't confirmed that far yet.

Changes

File Change
etl/src/destination/base.rs Add confirmed_flush_lsn() with default None impl and contract documentation
etl/src/replication/apply.rs Add poll_destination_flush(), move effective_flush_lsn() to ApplyLoop, modify batch flush / keepalive / shutdown paths, add 30 unit tests
etl/src/test_utils/test_destination_wrapper.rs Delegate confirmed_flush_lsn() to wrapped destination

Test plan

30 unit tests added directly in apply.rs (first unit tests in this file), covering:

  • ReplicationProgress (4 tests): Monotonic advancement invariants, initial values, equal-update no-op
  • effective_flush_lsn (9 tests): All combinations of legacy/controlled × idle/in-transaction/non-empty-batch × inflight/no-inflight, all-zeros startup, poll-then-query interaction
  • poll_destination_flush (8 tests): Legacy no-op, advances to confirmed, caps at received, no regression on lower confirmed, multiple sequential advances, exact boundary, confirmed=0 startup, inflight bool ignored
  • process_syncing_tables_after_batch_flush (7 tests): Legacy auto-advance, controlled skip auto-advance, no commit LSN, polls-before-decision ordering, .take() consumes commit LSN, confirmed > commit LSN, legacy commit < current flush monotonic guarantee
  • debug_assert validation (1 test): #[should_panic] when confirmed exceeds received
  • Edge case interaction (1 test): Both in-transaction and non-empty batch simultaneously

All existing tests continue to pass (161 total including the 30 new ones). Integration tests are unaffected (they require TESTS_DATABASE_HOST).

Closes #621

Add `confirmed_flush_lsn()` method to the `Destination` trait, enabling
asynchronous destinations to control when the flush LSN is advanced to
PostgreSQL. This prevents WAL buildup for destinations that need to
durably process data before confirming (e.g., checkpointing systems).

Changes:
- Add `confirmed_flush_lsn(&self) -> Option<(PgLsn, bool)>` to Destination trait
  with strong contract docs (must return CommitEvent.end_lsn values only)
- Add `poll_destination_flush()` helper on ApplyLoop with debug_assert
  validation and min-cap safety net for release builds
- Move `effective_flush_lsn()` from ApplyLoopState to ApplyLoop to access
  destination — now considers in-flight write status when idle
- Modify `process_syncing_tables_after_batch_flush()` to skip auto-advance
  when destination controls flushing
- Poll destination in keepalive handler and graceful shutdown for reactive
  progress
- Delegate `confirmed_flush_lsn()` in TestDestinationWrapper
- Add 30 unit tests covering all new logic, edge cases, and interactions

Closes supabase#621
@flak153 flak153 requested a review from a team as a code owner March 7, 2026 00:32
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 7, 2026

📝 Walkthrough

Summary by CodeRabbit

Release Notes

  • New Features

    • Enhanced replication progress tracking with destination-aware flushing to accurately reflect persisted data state and enable more timely progress updates.
  • Tests

    • Added comprehensive test coverage for destination-controlled flush behavior, progress monotonicity validation, and interaction scenarios across legacy and controlled destination modes.

Walkthrough

This PR introduces destination-controlled flush LSN tracking to etl's replication apply loop. A new confirmed_flush_lsn() method is added to the Destination trait, returning an optional LSN tuple that asynchronous destinations can use to report their durable processing progress. The apply loop now polls this method and uses the confirmed LSN to determine when to advance the replication progress, replacing the previous auto-advance behavior. Default implementations preserve backwards compatibility for existing destinations.

Sequence Diagram(s)

sequenceDiagram
    participant ApplyLoop
    participant Destination
    participant PostgreSQL

    loop Replication Apply Loop
        ApplyLoop->>Destination: write_events(batched_data)
        Destination-->>ApplyLoop: Ok(written)
        
        ApplyLoop->>ApplyLoop: poll_destination_flush()
        ApplyLoop->>Destination: confirmed_flush_lsn()
        Destination-->>ApplyLoop: Some((confirmed_lsn, inflight))
        ApplyLoop->>ApplyLoop: advance last_flush_lsn to confirmed_lsn
        
        ApplyLoop->>PostgreSQL: standby_status_update(confirmed_flush_lsn)
        PostgreSQL-->>ApplyLoop: ack
    end
Loading

Assessment against linked issues

Objective Addressed Explanation
Add optional confirmed_flush_lsn() method to Destination trait [#621] Method signature includes boolean tuple component Option<(PgLsn, bool)> rather than simple Option<PgLsn> as proposed. Purpose of boolean flag not documented in issue context.
Use destination's confirmed LSN in standby_status_update [#621]
Preserve backwards compatibility with default None behavior [#621]

Out-of-scope changes

Code Change Explanation
Return type Option<(PgLsn, bool)> instead of Option<PgLsn> (etl/src/destination/base.rs) Issue #621 proposes simple Option<PgLsn> return type; the boolean component in the tuple is not discussed or justified in the issue requirements.
Polling mechanism and poll_destination_flush() implementation (etl/src/replication/apply.rs, +861/-29) Issue #621 describes a passive method that ApplyLoop calls during status updates; the active polling mechanism with destination flush progress tracking appears to be an implementation extension beyond the stated requirements.

Tip

Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs).
Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@etl/src/replication/apply.rs`:
- Around line 1449-1458: After calling poll_destination_flush(), do not
unconditionally early-return when self.state.last_commit_end_lsn.take() is None;
instead determine an effective flush LSN to pass into
process_syncing_tables_after_batch_flush() by using the taken
last_commit_end_lsn if present or falling back to the destination-advanced flush
position (e.g. self.state.last_flush_lsn or whatever field
poll_destination_flush() updates) when last_commit_end_lsn is None, then call
process_syncing_tables_after_batch_flush(effective_lsn) before returning; update
the logic around poll_destination_flush(),
self.state.last_commit_end_lsn.take(), and the ApplyLoopAction::Continue path so
mid-transaction flushes that advance last_flush_lsn still trigger post-flush
sync processing.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Central YAML (base), Organization UI (inherited)

Review profile: CHILL

Plan: Pro

Run ID: 6091168e-a299-4dd9-9add-c6072b76608e

📥 Commits

Reviewing files that changed from the base of the PR and between 6e46731 and b18f82b.

📒 Files selected for processing (3)
  • etl/src/destination/base.rs
  • etl/src/replication/apply.rs
  • etl/src/test_utils/test_destination_wrapper.rs

Comment on lines +1449 to 1458
// Always poll destination first — even for mid-transaction flushes where
// last_commit_end_lsn is None, the destination may have confirmed previously
// sent transactions.
self.poll_destination_flush();

// Take the last commit end LSN, which is the highest LSN from any commit message in this
// batch. Batches can flush mid-transaction, so this may refer to the previous transaction.
let Some(last_commit_end_lsn) = self.state.last_commit_end_lsn.take() else {
return Ok(ApplyLoopAction::Continue);
};
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't skip post-flush sync processing when destination polling advanced last_flush_lsn.

After poll_destination_flush() runs, a controlled destination may already have advanced last_flush_lsn for an earlier committed transaction. The last_commit_end_lsn.take() early return on Line 1456 means mid-transaction flushes never pass that new flush position into process_syncing_tables_after_batch_flush(). In practice, a table-sync worker can stay stuck in Catchup/SyncDone until some later commit or idle period, even though the destination has already made the relevant commit durable.

Suggested fix
-        let Some(last_commit_end_lsn) = self.state.last_commit_end_lsn.take() else {
-            return Ok(ApplyLoopAction::Continue);
-        };
-
-        if self.destination.confirmed_flush_lsn().is_none() {
+        let last_commit_end_lsn = self.state.last_commit_end_lsn.take();
+        let destination_controls_flush = self.destination.confirmed_flush_lsn().is_some();
+
+        if !destination_controls_flush {
+            let Some(last_commit_end_lsn) = last_commit_end_lsn else {
+                return Ok(ApplyLoopAction::Continue);
+            };
             // Legacy: no destination control, auto-advance immediately after write_events().
             // Only reports progress up to the last completed transaction, which may cause
             // duplicates on restart for partial transactions. Destinations must handle
             // at-least-once delivery semantics.
             self.state
                 .replication_progress
                 .update_last_flush_lsn(last_commit_end_lsn);
         }
         // else: destination-controlled — poll_destination_flush() above already
         // advanced to the confirmed position. Don't auto-advance past it.

As per coding guidelines, "Comment only when the issue must be resolved before merge" and "Highlight only issues that could cause runtime errors, data loss, or severe maintainability issues."

Also applies to: 1460-1470

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@etl/src/replication/apply.rs` around lines 1449 - 1458, After calling
poll_destination_flush(), do not unconditionally early-return when
self.state.last_commit_end_lsn.take() is None; instead determine an effective
flush LSN to pass into process_syncing_tables_after_batch_flush() by using the
taken last_commit_end_lsn if present or falling back to the destination-advanced
flush position (e.g. self.state.last_flush_lsn or whatever field
poll_destination_flush() updates) when last_commit_end_lsn is None, then call
process_syncing_tables_after_batch_flush(effective_lsn) before returning; update
the logic around poll_destination_flush(),
self.state.last_commit_end_lsn.take(), and the ApplyLoopAction::Continue path so
mid-transaction flushes that advance last_flush_lsn still trigger post-flush
sync processing.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a false positive — the early return on last_commit_end_lsn.take() being None is the pre-existing behavior and is correct.

When last_commit_end_lsn is None, it means no commit was in this batch (mid-transaction flush). There is no new commit boundary to process table syncs against. The poll_destination_flush() call at line 1452 still runs and advances last_flush_lsn — that state change is preserved regardless of the early return.

Table sync processing will pick up the advanced last_flush_lsn at the next opportunity:

  • process_syncing_tables_after_commit_event() — on the next commit event
  • process_syncing_tables_when_idle() — during idle periods (which already uses effective_flush_lsn())

The suggested fix would call process_syncing_tables_after_batch_flush() without a meaningful commit boundary, which is not useful — the worker dispatch functions need a commit position to determine if syncing tables have caught up.

@iambriccardo
Copy link
Copy Markdown
Contributor

Thanks for the PR! Over the weekend, I did think about this problem and have a better solution in mind, which is also generic. Meaning that it changes how the write method acks progress to the apply worker.

I might try to put up a PR for that and link it here. But I am pretty busy, so if you want, you can also try to experiment with the idea.

The higher-level design idea is, instead of having a hybrid approach, whenever we send a request to write events, the apply loop will use a LSN + oneshot channel combination and store the response oneshot channel + end LSN and on the select! in the loop wait for the response from the destination.

This way, we can have many oneshot channels each of them tied to an end LSN that will define the progress to track. So that destinations don't have to track any LSN, they can just say, “Oh, this batch was inserted, now let's confirm it.”

It's basically a callback mechanism that is much more ergonomic and easier to reason about.

Then the logic for idle progress tracking becomes just saying "if there is no active transaction, no data in the batch, and no response channels waiting, we use the write_lsn as effective_flush_lsn).

Let me know if you need any clarifications!

@flak153
Copy link
Copy Markdown
Author

flak153 commented Mar 9, 2026

Thanks for the feedback! I really like this direction — the callback approach is fundamentally better than what we have. Let me walk through how I understand it and flag one design question.

How I understand the proposal

Instead of the destination polling model (confirmed_flush_lsn() returning LSN + inflight status), the apply loop pushes confirmation control to the destination via oneshot channels:

Current (this PR, poll-based):
  apply loop → write_events(batch) → [later] polls confirmed_flush_lsn() → advances LSN

Proposed (callback-based):
  apply loop → write_events(batch, oneshot_tx) → stores (end_lsn, oneshot_rx)
  destination → [when ready] fires oneshot_tx
  apply loop select! { confirmed = pending_rx => advance last_flush_lsn to end_lsn }

The key win is inversion of control: the destination doesn't track LSNs at all. It just receives a callback and fires it when the batch is durably processed. The apply loop owns all LSN bookkeeping.

Trait change

The confirmed_flush_lsn() method goes away. Instead, write_events gains a confirmation channel:

fn write_events(
    &self,
    events: Vec<Event>,
    confirm: oneshot::Sender<()>,
) -> impl Future<Output = EtlResult<()>> + Send;

Legacy destinations confirm immediately:

async fn write_events(&self, events: Vec<Event>, confirm: oneshot::Sender<()>) -> EtlResult<()> {
    // ... process events ...
    let _ = confirm.send(());  // confirm inline
    Ok(())
}

Async destinations (like Feldera) defer:

async fn write_events(&self, events: Vec<Event>, confirm: oneshot::Sender<()>) -> EtlResult<()> {
    self.work_queue.send((events, confirm)).await?;  // hand off to background worker
    Ok(())  // return immediately
}

Apply loop changes

The select! loop gains a 5th branch for confirmations:

// New state in ApplyLoop:
pending_confirmations: BTreeMap<PgLsn, oneshot::Receiver<()>>,

// In send_batch_to_destination:
let (tx, rx) = oneshot::channel();
let end_lsn = self.state.last_commit_end_lsn.unwrap_or(current_lsn);
self.destination.write_events(events_batch, tx).await?;
self.pending_confirmations.insert(end_lsn, rx);

// New select! branch:
Some((confirmed_lsn, _)) = poll_next_confirmation(&mut self.pending_confirmations) => {
    self.state.replication_progress.update_last_flush_lsn(confirmed_lsn);
}

Idle progress tracking replacement

Current: has_inflight boolean from confirmed_flush_lsn()
Proposed: pending_confirmations.is_empty()

fn effective_flush_lsn(&self) -> PgLsn {
    let is_idle = !self.state.handling_transaction()
        && self.state.events_batch.is_empty();

    if is_idle && self.pending_confirmations.is_empty() {
        self.state.replication_progress.last_received_lsn
    } else {
        self.state.replication_progress.last_flush_lsn
    }
}

This is cleaner — the legacy vs controlled distinction disappears entirely. All destinations use the same code path; the only difference is when they fire the oneshot.

What this also solves

The current code has this TODO at apply.rs:933:

// TODO: in the future we want to investigate how to perform the writing asynchronously
//  to avoid stalling the apply loop.
self.destination.write_events(events_batch).await?;

The callback approach naturally enables this. If the destination returns from write_events quickly (just queues work), the apply loop isn't blocked. It continues processing WAL events and handling keepalives. This eliminates the wal_sender_timeout risk from slow destinations — the exact problem flagged in issue #621 where blocking write_events for backpressure also blocks keepalive processing.

One design question

Shutdown semantics: On graceful shutdown with pending confirmation channels — do we wait for them to resolve (destination is still processing), or report last_flush_lsn and accept replay on restart?

I'd lean toward waiting on graceful shutdown since the destination is still alive, and accepting replay on abrupt shutdown (channels get dropped naturally). But wanted to check if you see it differently.

The rest follows naturally from keeping it simple:

  • Only pair a channel when the batch contains a commit (no commit = no confirmation to track)
  • No need to handle out-of-order confirmation — write_events is awaited sequentially, batches arrive in order
  • Dropped oneshot = failure (simple, sufficient)
  • Backpressure is a separate concern handled by the existing MemoryMonitor/BackpressureStream

Next steps

Happy to implement this redesign — I have the full codebase context and working integration tests that should transfer to the new approach with minimal changes. The integration tests test the observable behavior (slot position, data convergence), so they stay valid regardless of the internal mechanism.

Let me know if my understanding is correct and I can start prototyping.

@iambriccardo
Copy link
Copy Markdown
Contributor

Hi, I spent a day doing some experiments with asynchronous flushing. For your specific case I recommend you to implement a blocking check in the write_events method which allows you to wait for the results to come back and then tell ETL that the write succeeded by just blocking on the method.

The asynchronous flushing implementation could theoretically improve performance but it dramatically increases the complexity of the apply loop state machine. For this reason, I will do some bechmarks but I am not 100% sure I will keep it.

@iambriccardo
Copy link
Copy Markdown
Contributor

If you are curious about my experimental PR: #628

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Allow Destination to control confirmed_flush_lsn

2 participants