Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b4e7013
feat(core): Implement asynchronous flushing
iambriccardo Mar 9, 2026
09e049f
Improve
iambriccardo Mar 9, 2026
624c96f
Improve
iambriccardo Mar 9, 2026
fced9fd
Improve
iambriccardo Mar 9, 2026
4804bdd
Improve
iambriccardo Mar 9, 2026
e1e6daa
Improve
iambriccardo Mar 10, 2026
10d7e6b
Merge branch 'main' into asynchronous-progress-tracking
iambriccardo Mar 10, 2026
7ef74cc
Improve
iambriccardo Mar 10, 2026
e2971a9
Improve
iambriccardo Mar 10, 2026
2666219
Improve
iambriccardo Mar 18, 2026
4abe2be
Improve
iambriccardo Mar 18, 2026
e0f58ab
Improve
iambriccardo Mar 18, 2026
4306254
Improve
iambriccardo Mar 18, 2026
b24b453
Improve
iambriccardo Mar 18, 2026
d9da3a1
Improve
iambriccardo Mar 18, 2026
8aaf784
Improve
iambriccardo Mar 18, 2026
e0d9cd3
Improve
iambriccardo Mar 18, 2026
a7fac01
Improve
iambriccardo Mar 18, 2026
b9be0b3
Improve
iambriccardo Mar 19, 2026
691d1bc
Improve
iambriccardo Mar 19, 2026
8755d97
Improve
iambriccardo Mar 19, 2026
7432bab
Improve
iambriccardo Mar 19, 2026
e358290
Improve
iambriccardo Mar 19, 2026
a54ce45
Improve
iambriccardo Mar 19, 2026
d64343c
Improve
iambriccardo Mar 19, 2026
87b5e9f
Improve
iambriccardo Mar 19, 2026
28c0ec9
Improve
iambriccardo Mar 19, 2026
6005be5
Improve
iambriccardo Mar 19, 2026
b3c3995
Improve
iambriccardo Mar 19, 2026
10a0cc2
Improve
iambriccardo Mar 19, 2026
5f4fdee
Improve
iambriccardo Mar 20, 2026
9aef34a
Improve
iambriccardo Mar 20, 2026
b56b0f7
Merge branch 'main' into asynchronous-progress-tracking
iambriccardo Mar 23, 2026
382c1be
Merge
iambriccardo Mar 23, 2026
61f96fd
Improve
iambriccardo Mar 23, 2026
9a37493
Improve
iambriccardo Mar 24, 2026
1c94359
Merge
iambriccardo Mar 24, 2026
036d9c5
Improve
iambriccardo Mar 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions etl-benchmarks/benches/table_copies.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use clap::{Parser, Subcommand, ValueEnum};
use etl::destination::Destination;
use etl::destination::{BatchFlushResult, Destination};
use etl::error::EtlResult;
use etl::pipeline::Pipeline;
use etl::state::table::TableReplicationPhaseType;
Expand Down Expand Up @@ -487,10 +487,14 @@ impl Destination for BenchDestination {
}
}

async fn write_events(&self, events: Vec<Event>) -> EtlResult<()> {
async fn write_events(
&self,
events: Vec<Event>,
flush_result: BatchFlushResult<()>,
) -> EtlResult<()> {
match self {
BenchDestination::Null(dest) => dest.write_events(events).await,
BenchDestination::BigQuery(dest) => dest.write_events(events).await,
BenchDestination::Null(dest) => dest.write_events(events, flush_result).await,
BenchDestination::BigQuery(dest) => dest.write_events(events, flush_result).await,
}
}
}
Expand All @@ -515,7 +519,13 @@ impl Destination for NullDestination {
Ok(())
}

async fn write_events(&self, _events: Vec<Event>) -> EtlResult<()> {
async fn write_events(
&self,
_events: Vec<Event>,
flush_result: BatchFlushResult<()>,
) -> EtlResult<()> {
let _ = flush_result.send(Ok(()));

Ok(())
}
}
66 changes: 59 additions & 7 deletions etl-destinations/src/bigquery/core.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use etl::destination::Destination;
use etl::destination::{BatchFlushResult, Destination, DestinationActor, DestinationActorOutcome};
use etl::error::{ErrorKind, EtlError, EtlResult};
use etl::store::schema::SchemaStore;
use etl::store::state::StateStore;
Expand Down Expand Up @@ -156,11 +156,12 @@ pub struct BigQueryDestination<S> {
pipeline_id: PipelineId,
store: S,
inner: Arc<Mutex<Inner>>,
streaming_actor: Arc<Mutex<Option<DestinationActor<Vec<Event>>>>>,
}

impl<S> BigQueryDestination<S>
where
S: StateStore + SchemaStore + Send + Sync,
S: StateStore + SchemaStore + Clone + Send + Sync + 'static,
{
/// Creates a new [`BigQueryDestination`] with a pre-configured client.
///
Expand Down Expand Up @@ -188,6 +189,7 @@ where
pipeline_id,
store,
inner: Arc::new(Mutex::new(inner)),
streaming_actor: Arc::new(Mutex::new(None)),
}
}

Expand Down Expand Up @@ -221,6 +223,7 @@ where
pipeline_id,
store,
inner: Arc::new(Mutex::new(inner)),
streaming_actor: Arc::new(Mutex::new(None)),
})
}

Expand Down Expand Up @@ -253,6 +256,7 @@ where
pipeline_id,
store,
inner: Arc::new(Mutex::new(inner)),
streaming_actor: Arc::new(Mutex::new(None)),
})
}
/// Creates a new [`BigQueryDestination`] using Application Default Credentials (ADC).
Expand Down Expand Up @@ -283,6 +287,7 @@ where
pipeline_id,
store,
inner: Arc::new(Mutex::new(inner)),
streaming_actor: Arc::new(Mutex::new(None)),
})
}

Expand Down Expand Up @@ -327,6 +332,7 @@ where
pipeline_id,
store,
inner: Arc::new(Mutex::new(inner)),
streaming_actor: Arc::new(Mutex::new(None)),
})
}

Expand Down Expand Up @@ -826,16 +832,59 @@ where

Ok(())
}

/// Returns the lazily started streaming actor.
///
/// Each queued flush batch contains both the event batch and the flush-result sender that
/// should be completed once the BigQuery write finishes. If the send back into the apply loop
/// fails, we treat that as the receiver having gone away and just log it.
async fn get_or_start_streaming_actor(&self) -> DestinationActor<Vec<Event>> {
let mut actor = self.streaming_actor.lock().await;
if let Some(actor) = actor.as_ref() {
return actor.clone();
}

let destination = self.clone();
let started_actor = DestinationActor::start(move |events, flush_result| {
let destination = destination.clone();

async move {
let result = destination.write_events(events).await;
if flush_result.send(result).is_err() {
warn!(
"failed to send bigquery flush result because apply loop was likely closed"
);
}

Ok(DestinationActorOutcome::Continue)
}
});

*actor = Some(started_actor.clone());
started_actor
}
}

impl<S> Destination for BigQueryDestination<S>
where
S: StateStore + SchemaStore + Send + Sync,
S: StateStore + SchemaStore + Clone + Send + Sync + 'static,
{
fn name() -> &'static str {
"bigquery"
}

async fn shutdown(&self) -> EtlResult<()> {
let actor = {
let mut actor = self.streaming_actor.lock().await;
actor.take()
};

match actor {
Some(actor) => actor.shutdown().await,
None => Ok(()),
}
}

async fn truncate_table(&self, table_id: TableId) -> EtlResult<()> {
self.process_truncate_for_table_ids(iter::once(table_id))
.await
Expand All @@ -851,10 +900,13 @@ where
Ok(())
}

async fn write_events(&self, events: Vec<Event>) -> EtlResult<()> {
self.write_events(events).await?;

Ok(())
async fn write_events(
&self,
events: Vec<Event>,
flush_result: BatchFlushResult<()>,
) -> EtlResult<()> {
let actor = self.get_or_start_streaming_actor().await;
actor.send(events, flush_result)
}
}

Expand Down
2 changes: 1 addition & 1 deletion etl-destinations/src/bigquery/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl BigQueryDatabase {
schema_store: S,
) -> BigQueryDestination<S>
where
S: StateStore + SchemaStore + Send + Sync,
S: StateStore + SchemaStore + Clone + Send + Sync + 'static,
{
BigQueryDestination::new_with_key_path(
self.project_id.clone(),
Expand Down
64 changes: 56 additions & 8 deletions etl-destinations/src/iceberg/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::egress::{PROCESSING_TYPE_STREAMING, PROCESSING_TYPE_TABLE_COPY, log_p
use crate::iceberg::IcebergClient;
use crate::iceberg::error::iceberg_error_to_etl_error;
use crate::table_name::try_stringify_table_name;
use etl::destination::Destination;
use etl::destination::{BatchFlushResult, Destination, DestinationActor, DestinationActorOutcome};
use etl::error::{ErrorKind, EtlResult};
use etl::store::schema::SchemaStore;
use etl::store::state::StateStore;
Expand All @@ -19,7 +19,7 @@ use etl::types::{
use etl::{bail, etl_error};
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use tracing::{debug, info};
use tracing::{debug, info, warn};

/// Suffix for changelog tables
const ICEBERG_CHANGELOG_TABLE_SUFFIX: &str = "changelog";
Expand Down Expand Up @@ -94,6 +94,7 @@ pub struct IcebergDestination<S> {
client: IcebergClient,
store: S,
inner: Arc<Mutex<Inner>>,
streaming_actor: Arc<Mutex<Option<DestinationActor<Vec<Event>>>>>,
}

/// Namespace in the destination where the tables will be copied
Expand Down Expand Up @@ -151,7 +152,7 @@ struct Inner {

impl<S> IcebergDestination<S>
where
S: StateStore + SchemaStore + Send + Sync,
S: StateStore + SchemaStore + Clone + Send + Sync + 'static,
{
/// Creates a new Iceberg destination instance.
///
Expand All @@ -171,9 +172,41 @@ where
created_namespaces: HashSet::new(),
namespace,
})),
streaming_actor: Arc::new(Mutex::new(None)),
}
}

/// Returns the lazily started streaming actor.
///
/// Each queued flush batch contains both the event batch and the flush-result sender that
/// should be completed once the Iceberg write finishes. If the send back into the apply loop
/// fails, we treat that as the receiver having gone away and just log it.
async fn get_or_start_streaming_actor(&self) -> DestinationActor<Vec<Event>> {
let mut actor = self.streaming_actor.lock().await;
if let Some(actor) = actor.as_ref() {
return actor.clone();
}

let destination = self.clone();
let started_actor = DestinationActor::start(move |events, flush_result| {
let destination = destination.clone();

async move {
let result = destination.write_events(events).await;
if flush_result.send(result).is_err() {
warn!(
"failed to send iceberg flush result because apply loop was likely closed"
);
}

Ok(DestinationActorOutcome::Continue)
}
});

*actor = Some(started_actor.clone());
started_actor
}

/// Truncates an Iceberg table by dropping and recreating it.
///
/// Removes all data from the target table by dropping the existing Iceberg table
Expand Down Expand Up @@ -548,13 +581,25 @@ where

impl<S> Destination for IcebergDestination<S>
where
S: StateStore + SchemaStore + Send + Sync,
S: StateStore + SchemaStore + Clone + Send + Sync + 'static,
{
/// Returns the identifier name for this destination type.
fn name() -> &'static str {
"iceberg"
}

async fn shutdown(&self) -> EtlResult<()> {
let actor = {
let mut actor = self.streaming_actor.lock().await;
actor.take()
};

match actor {
Some(actor) => actor.shutdown().await,
None => Ok(()),
}
}

/// Truncates the specified table by dropping and recreating it.
///
/// Removes all data from the target Iceberg table while preserving
Expand Down Expand Up @@ -585,10 +630,13 @@ where
/// Handles insert, update, delete, and truncate events by converting
/// them to appropriate Iceberg operations. Events are batched by table
/// and processed concurrently for optimal performance.
async fn write_events(&self, events: Vec<Event>) -> EtlResult<()> {
self.write_events(events).await?;

Ok(())
async fn write_events(
&self,
events: Vec<Event>,
flush_result: BatchFlushResult<()>,
) -> EtlResult<()> {
let actor = self.get_or_start_streaming_actor().await;
actor.send(events, flush_result)
}
}

Expand Down
2 changes: 1 addition & 1 deletion etl-destinations/tests/bigquery_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1622,7 +1622,7 @@ async fn table_array_with_null_values() {
// We sleep to wait for the event to be processed. This is not ideal, but if we wanted to do
// this better, we would have to also implement error handling within the apply worker to write
// in the state store.
sleep(Duration::from_secs(1)).await;
sleep(Duration::from_secs(5)).await;

// Wait for the pipeline expecting an error to be returned.
let err = pipeline.shutdown_and_wait().await.err().unwrap();
Expand Down
Loading
Loading