experimental(core): Implement asynchronous flushing#628
Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Central YAML (base), Organization UI (inherited) Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
I am re-opening it to experiment with some ideas. |
There was a problem hiding this comment.
Pull request overview
This PR updates ETL’s streaming destination contract to support asynchronous durable flush acknowledgements, allowing write_events() to return after dispatch while replication progress only advances once a separate flush result completes.
Changes:
- Extend
Destination::write_eventsto accept aBatchFlushResult<()>and introduce flush-result + task-management primitives (flush_result,DestinationTaskSet). - Refactor the replication apply loop to explicitly track a single in-flight flush result, pause intake when another flush is needed, and add proactive periodic keepalive behavior based on
wal_sender_timeout. - Update destinations, benchmarks, and tests to the new async-flush semantics, plus add a replication client helper to read
wal_sender_timeout.
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| etl/tests/replication.rs | Adds coverage for reading wal_sender_timeout from PostgreSQL. |
| etl/tests/pipeline.rs | Adjusts pipeline test synchronization to align with post-flush state transitions. |
| etl/src/workers/pool.rs | Minor rename in join error handling for clarity. |
| etl/src/workers/apply.rs | Adds an invariant check for apply-loop completion (but currently does not fail hard). |
| etl/src/test_utils/test_destination_wrapper.rs | Updates test destination wrapper to forward async flush completion via a spawned task. |
| etl/src/test_utils/memory_destination.rs | Updates in-memory destination to send flush completion via BatchFlushResult. |
| etl/src/replication/stream.rs | Adds PeriodicKeepAlive status update type for proactive heartbeats. |
| etl/src/replication/client.rs | Adds get_wal_sender_timeout() parsing from pg_settings. |
| etl/src/replication/apply.rs | Major apply-loop refactor: explicit flush-result tracking, exit intent, and proactive keepalives. |
| etl/src/pipeline.rs | Tweaks log levels/messages around shutdown/error collection. |
| etl/src/lib.rs | Updates crate docs/example to the new write_events(..., flush_result) signature. |
| etl/src/destination/task_set.rs | New helper to track, reap, and abort destination-owned background tasks. |
| etl/src/destination/mod.rs | Exposes new destination submodules (flush_result, task_set). |
| etl/src/destination/flush_result.rs | New async flush result channel + metrics plumbing. |
| etl/src/destination/base.rs | Updates the Destination trait docs + signature for async flush reporting. |
| etl-destinations/tests/bigquery_pipeline.rs | Increases a sleep to accommodate async background writes. |
| etl-destinations/src/iceberg/core.rs | Adopts async flush result contract and task tracking for streaming writes. |
| etl-destinations/src/bigquery/test_utils.rs | Updates generic bounds to support spawned tasks ('static, Clone). |
| etl-destinations/src/bigquery/core.rs | Adopts async flush result contract and task tracking for streaming writes. |
| etl-benchmarks/benches/table_copies.rs | Updates benchmark destinations to satisfy the new write_events signature. |
Comments suppressed due to low confidence (1)
etl/src/workers/apply.rs:318
- If the apply loop ever returns
Completedhere, the worker logs an error but still returnsOk(()), which will make the pipeline think streaming finished successfully and stop retrying. Since this state is declared impossible for the apply worker, it should be treated as a hard error (e.g., return anEtlError/ panic) so the failure is surfaced and the worker can restart or the pipeline can fail fast.
// The apply loop when used via the apply worker, should never complete since it's always
// streaming indefinitely.
debug_assert!(!matches!(apply_loop_result, ApplyLoopResult::Completed));
match apply_loop_result {
ApplyLoopResult::Completed => {
error!("apply worker apply loop completed, but it should never complete");
}
ApplyLoopResult::Paused => {
info!("apply worker apply loop paused for shutdown");
}
}
Ok(())
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
bnjjj
left a comment
There was a problem hiding this comment.
Just the comment on rust version otherwise LGTM
189ce90 to
036d9c5
Compare
Summary
This PR introduces asynchronous progress tracking for streaming destination writes.
The main goal is to let destinations accept a batch synchronously, finish the actual write asynchronously, and report completion back to the apply loop only when the batch is durably flushed. That lets ETL preserve its replication guarantees without forcing every destination to block
write_events()until all downstream work is done.What Changed
Destination API
Destination::write_eventsnow takes aBatchFlushResult<()>in addition to the event batch.This splits destination write handling into two phases:
write_events().BatchFlushResult.To support that, this PR adds:
BatchFlushResultPendingBatchFlushResultCompletedBatchFlushResultBatchFlushMetricsThese types carry both the final result and metadata needed by the apply loop, including dispatch timing and the last commit LSN associated with the batch.
A new
DestinationTaskSethelper was also added so destinations can safely manage spawned background tasks and clean them up during shutdown.Apply Loop
The apply loop now tracks in-flight destination flushes explicitly instead of assuming a batch is complete as soon as
write_events()returns.Key behavioral changes:
Exit / Lifecycle Handling
The previous loop-control flow was refactored around an internal
ExitIntent(PauseorComplete).That makes it easier to merge exit requests coming from different places, such as:
A worker can now request that the current invocation eventually pause or complete, while the apply loop still drains any required flush/shutdown barriers before returning.
Keepalive And Shutdown Behavior
The replication client now reads PostgreSQL
wal_sender_timeoutfrompg_settings.The apply loop uses that value to compute a proactive keepalive deadline and sends periodic reply-requesting heartbeats when needed. This helps in cases where the loop is healthy but temporarily stalled on async flush completion or the source is quiet.
Shutdown was also tightened up, it doesn't anymore have a complex deferred mechanism, but it rather initiates graceful shutdown immediately, even if work is pending. This is fine, since the system is designed around at-least-once delivery semantics and some repeated data is fine.
Technical Decisions
Why use an explicit flush result channel?
Because dispatch success and durable completion are different events.
Returning only
Result<()>fromwrite_events()made ETL treat "destination accepted the work" and "destination finished the work" as the same thing. That is fine for fully synchronous destinations, but incorrect for destinations that queue, fan out, or flush in background tasks.The new channel-based result keeps the trait generic while making that distinction explicit.
Why keep only one in-flight flush result?
To keep ordering and progress accounting simple and safe.
The apply loop still allows destinations to do internal async work, but ETL itself only advances replication state once the currently tracked batch has completed. That preserves ordered per-table streaming semantics without introducing a more complicated multi-batch acknowledgment model.
Why tie state transitions to post-flush completion?
Because transitions like catchup completion or
sync_done -> readyshould only happen after the relevant destination work is durably finished.If those transitions happened earlier, ETL could advertise progress or unblock workers based on writes that were only queued, not actually flushed yet.