Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b4e7013
feat(core): Implement asynchronous flushing
iambriccardo Mar 9, 2026
09e049f
Improve
iambriccardo Mar 9, 2026
624c96f
Improve
iambriccardo Mar 9, 2026
fced9fd
Improve
iambriccardo Mar 9, 2026
4804bdd
Improve
iambriccardo Mar 9, 2026
e1e6daa
Improve
iambriccardo Mar 10, 2026
10d7e6b
Merge branch 'main' into asynchronous-progress-tracking
iambriccardo Mar 10, 2026
7ef74cc
Improve
iambriccardo Mar 10, 2026
e2971a9
Improve
iambriccardo Mar 10, 2026
2666219
Improve
iambriccardo Mar 18, 2026
4abe2be
Improve
iambriccardo Mar 18, 2026
e0f58ab
Improve
iambriccardo Mar 18, 2026
4306254
Improve
iambriccardo Mar 18, 2026
b24b453
Improve
iambriccardo Mar 18, 2026
d9da3a1
Improve
iambriccardo Mar 18, 2026
8aaf784
Improve
iambriccardo Mar 18, 2026
e0d9cd3
Improve
iambriccardo Mar 18, 2026
a7fac01
Improve
iambriccardo Mar 18, 2026
b9be0b3
Improve
iambriccardo Mar 19, 2026
691d1bc
Improve
iambriccardo Mar 19, 2026
8755d97
Improve
iambriccardo Mar 19, 2026
7432bab
Improve
iambriccardo Mar 19, 2026
e358290
Improve
iambriccardo Mar 19, 2026
a54ce45
Improve
iambriccardo Mar 19, 2026
d64343c
Improve
iambriccardo Mar 19, 2026
87b5e9f
Improve
iambriccardo Mar 19, 2026
28c0ec9
Improve
iambriccardo Mar 19, 2026
6005be5
Improve
iambriccardo Mar 19, 2026
b3c3995
Improve
iambriccardo Mar 19, 2026
10a0cc2
Improve
iambriccardo Mar 19, 2026
5f4fdee
Improve
iambriccardo Mar 20, 2026
9aef34a
Improve
iambriccardo Mar 20, 2026
b56b0f7
Merge branch 'main' into asynchronous-progress-tracking
iambriccardo Mar 23, 2026
382c1be
Merge
iambriccardo Mar 23, 2026
61f96fd
Improve
iambriccardo Mar 23, 2026
9a37493
Improve
iambriccardo Mar 24, 2026
1c94359
Merge
iambriccardo Mar 24, 2026
036d9c5
Improve
iambriccardo Mar 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions docs/explanation/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ Where replicated data goes. Implement the `Destination` trait to send data anywh
```rust
pub trait Destination {
fn name() -> &'static str;
fn truncate_table(&self, table_id: TableId) -> impl Future<Output = EtlResult<()>> + Send;
fn write_table_rows(&self, table_id: TableId, rows: Vec<TableRow>) -> impl Future<Output = EtlResult<()>> + Send;
fn write_events(&self, events: Vec<Event>) -> impl Future<Output = EtlResult<()>> + Send;
fn truncate_table(&self, table_id: TableId, async_result: TruncateTableResult<()>) -> impl Future<Output = EtlResult<()>> + Send;
fn write_table_rows(&self, table_id: TableId, rows: Vec<TableRow>, async_result: WriteTableRowsResult<()>) -> impl Future<Output = EtlResult<()>> + Send;
fn write_events(&self, events: Vec<Event>, async_result: WriteEventsResult<()>) -> impl Future<Output = EtlResult<()>> + Send;
}
```

Expand All @@ -81,6 +81,11 @@ pub trait Destination {
| `write_table_rows()` | During initial copy | Receive bulk rows |
| `write_events()` | After initial copy | Receive streaming changes |

Each write-like method receives an async result handle. The intent is different per method:

- `write_events()`: after dispatch succeeds, ETL may keep processing while the destination finishes the batch.
- `truncate_table()` and `write_table_rows()`: ETL waits for the result immediately. The handle is still useful because it keeps the destination API uniform and lets implementations reuse similar internal patterns.

### Store

Persists pipeline state so replication can resume after restarts. Three traits work together:
Expand Down
30 changes: 18 additions & 12 deletions docs/explanation/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,19 +140,25 @@ Handle this by either:
- Ignoring Begin/Commit if your destination does not require transactions

```rust
async fn write_events(&self, events: Vec<Event>) -> EtlResult<()> {
for event in events {
match event {
Event::Insert(e) => self.handle_insert(e).await?,
Event::Update(e) => self.handle_update(e).await?,
Event::Delete(e) => self.handle_delete(e).await?,
Event::Truncate(e) => self.handle_truncate(e).await?,
Event::Relation(e) => self.handle_schema(e).await?,
// Transaction markers - safe to ignore for most destinations
Event::Begin(_) | Event::Commit(_) => {}
Event::Unsupported => {}
async fn write_events(&self, events: Vec<Event>, async_result: WriteEventsResult<()>) -> EtlResult<()> {
let result = async {
for event in events {
match event {
Event::Insert(e) => self.handle_insert(e).await?,
Event::Update(e) => self.handle_update(e).await?,
Event::Delete(e) => self.handle_delete(e).await?,
Event::Truncate(e) => self.handle_truncate(e).await?,
Event::Relation(e) => self.handle_schema(e).await?,
// Transaction markers - safe to ignore for most destinations
Event::Begin(_) | Event::Commit(_) => {}
Event::Unsupported => {}
}
}
Ok(())
}
.await;

async_result.send(result);
Ok(())
}
```
Expand Down Expand Up @@ -204,4 +210,4 @@ ETL batches events before calling `write_events()`. A batch may contain events f
## Next Steps

- [Custom Destinations](../guides/custom-implementations.md): Implement your own event handling
- [Architecture](architecture.md): How events flow through ETL
- [Architecture](architecture.md): How events flow through ETL
10 changes: 7 additions & 3 deletions docs/explanation/traits.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ Receives replicated data. This is the primary extension point for sending data t
pub trait Destination {
fn name() -> &'static str;
fn shutdown(&self) -> impl Future<Output = EtlResult<()>> + Send { async { Ok(()) } }
fn truncate_table(&self, table_id: TableId) -> impl Future<Output = EtlResult<()>> + Send;
fn write_table_rows(&self, table_id: TableId, table_rows: Vec<TableRow>) -> impl Future<Output = EtlResult<()>> + Send;
fn write_events(&self, events: Vec<Event>) -> impl Future<Output = EtlResult<()>> + Send;
fn truncate_table(&self, table_id: TableId, async_result: TruncateTableResult<()>) -> impl Future<Output = EtlResult<()>> + Send;
fn write_table_rows(&self, table_id: TableId, table_rows: Vec<TableRow>, async_result: WriteTableRowsResult<()>) -> impl Future<Output = EtlResult<()>> + Send;
fn write_events(&self, events: Vec<Event>, async_result: WriteEventsResult<()>) -> impl Future<Output = EtlResult<()>> + Send;
}
```

Expand All @@ -33,6 +33,10 @@ pub trait Destination {
- Operations should be idempotent when possible (ETL may retry on failure)
- Handle concurrent calls safely (parallel table sync workers)
- Process events in order to maintain data consistency
- All three write-like methods use async results, but ETL waits differently:
- `truncate_table()` waits immediately.
- `write_table_rows()` also waits immediately, requesting the next batch only after the current one finishes for that copy partition.
- `write_events()` is the method where ETL can keep processing while the destination finishes the current batch.

See [Event Types](events.md) for details on the events received by `write_events()`.

Expand Down
47 changes: 34 additions & 13 deletions docs/guides/custom-implementations.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub struct CustomStore {

impl CustomStore {
pub fn new() -> Self {
info!("Creating custom store");
info!("creating custom store");
Self {
tables: Arc::new(Mutex::new(HashMap::new())),
}
Expand Down Expand Up @@ -138,7 +138,7 @@ impl StateStore for CustomStore {
table_id: TableId,
state: TableReplicationPhase,
) -> EtlResult<()> {
info!("Table {} -> {:?}", table_id.0, state);
info!("table {} -> {:?}", table_id.0, state);
let mut tables = self.tables.lock().await;
tables.entry(table_id).or_default().state = Some(state);
Ok(())
Expand Down Expand Up @@ -236,8 +236,8 @@ impl HttpDestination {
Ok(resp) if resp.status().is_client_error() => {
bail!(ErrorKind::Unknown, "Client error", resp.status());
}
Ok(resp) => warn!("Attempt {}/3: status {}", attempt, resp.status()),
Err(e) => warn!("Attempt {}/3: {}", attempt, e),
Ok(resp) => warn!("attempt {}/3: status {}", attempt, resp.status()),
Err(e) => warn!("attempt {}/3: {}", attempt, e),
}
if attempt < 3 {
tokio::time::sleep(Duration::from_millis(500 * attempt as u64)).await;
Expand All @@ -252,16 +252,28 @@ impl Destination for HttpDestination {
"http"
}

async fn truncate_table(&self, table_id: TableId) -> EtlResult<()> {
info!("Truncating table {}", table_id.0);
self.post(&format!("tables/{}/truncate", table_id.0), json!({})).await
async fn truncate_table(
&self,
table_id: TableId,
async_result: TruncateTableResult<()>,
) -> EtlResult<()> {
info!("truncating table {}", table_id.0);
let result = self.post(&format!("tables/{}/truncate", table_id.0), json!({})).await;
async_result.send(result);
Ok(())
}

async fn write_table_rows(&self, table_id: TableId, rows: Vec<TableRow>) -> EtlResult<()> {
async fn write_table_rows(
&self,
table_id: TableId,
rows: Vec<TableRow>,
async_result: WriteTableRowsResult<()>,
) -> EtlResult<()> {
if rows.is_empty() {
async_result.send(Ok(()));
return Ok(());
}
info!("Writing {} rows to table {}", rows.len(), table_id.0);
info!("writing {} rows to table {}", rows.len(), table_id.0);

let payload = json!({
"table_id": table_id.0,
Expand All @@ -270,14 +282,21 @@ impl Destination for HttpDestination {
}).collect::<Vec<_>>()
});

self.post(&format!("tables/{}/rows", table_id.0), payload).await
let result = self.post(&format!("tables/{}/rows", table_id.0), payload).await;
async_result.send(result);
Ok(())
}

async fn write_events(&self, events: Vec<Event>) -> EtlResult<()> {
async fn write_events(
&self,
events: Vec<Event>,
async_result: WriteEventsResult<()>,
) -> EtlResult<()> {
if events.is_empty() {
async_result.send(Ok(()));
return Ok(());
}
info!("Writing {} events", events.len());
info!("writing {} events", events.len());

let payload = json!({
"events": events.iter().map(|e| {
Expand All @@ -294,7 +313,9 @@ impl Destination for HttpDestination {
}).collect::<Vec<_>>()
});

self.post("events", payload).await
let result = self.post("events", payload).await;
async_result.send(result);
Ok(())
}
}
```
Expand Down
52 changes: 42 additions & 10 deletions etl-benchmarks/benches/table_copies.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use clap::{Parser, Subcommand, ValueEnum};
use etl::destination::Destination;
use etl::destination::async_result::{
TruncateTableResult, WriteEventsResult, WriteTableRowsResult,
};
use etl::error::EtlResult;
use etl::pipeline::Pipeline;
use etl::state::table::TableReplicationPhaseType;
Expand Down Expand Up @@ -469,28 +472,43 @@ impl Destination for BenchDestination {
"bench_destination"
}

async fn truncate_table(&self, table_id: TableId) -> EtlResult<()> {
async fn truncate_table(
&self,
table_id: TableId,
async_result: TruncateTableResult<()>,
) -> EtlResult<()> {
match self {
BenchDestination::Null(dest) => dest.truncate_table(table_id).await,
BenchDestination::BigQuery(dest) => dest.truncate_table(table_id).await,
BenchDestination::Null(dest) => dest.truncate_table(table_id, async_result).await,
BenchDestination::BigQuery(dest) => dest.truncate_table(table_id, async_result).await,
}
}

async fn write_table_rows(
&self,
table_id: TableId,
table_rows: Vec<TableRow>,
async_result: WriteTableRowsResult<()>,
) -> EtlResult<()> {
match self {
BenchDestination::Null(dest) => dest.write_table_rows(table_id, table_rows).await,
BenchDestination::BigQuery(dest) => dest.write_table_rows(table_id, table_rows).await,
BenchDestination::Null(dest) => {
dest.write_table_rows(table_id, table_rows, async_result)
.await
}
BenchDestination::BigQuery(dest) => {
dest.write_table_rows(table_id, table_rows, async_result)
.await
}
}
}

async fn write_events(&self, events: Vec<Event>) -> EtlResult<()> {
async fn write_events(
&self,
events: Vec<Event>,
async_result: WriteEventsResult<()>,
) -> EtlResult<()> {
match self {
BenchDestination::Null(dest) => dest.write_events(events).await,
BenchDestination::BigQuery(dest) => dest.write_events(events).await,
BenchDestination::Null(dest) => dest.write_events(events, async_result).await,
BenchDestination::BigQuery(dest) => dest.write_events(events, async_result).await,
}
}
}
Expand All @@ -500,22 +518,36 @@ impl Destination for NullDestination {
"null"
}

async fn truncate_table(&self, _table_id: TableId) -> EtlResult<()> {
async fn truncate_table(
&self,
_table_id: TableId,
async_result: TruncateTableResult<()>,
) -> EtlResult<()> {
async_result.send(Ok(()));

Ok(())
}

async fn write_table_rows(
&self,
_table_id: TableId,
table_rows: Vec<TableRow>,
async_result: WriteTableRowsResult<()>,
) -> EtlResult<()> {
self.row_count
.fetch_add(table_rows.len() as u64, Ordering::Relaxed);
async_result.send(Ok(()));

Ok(())
}

async fn write_events(&self, _events: Vec<Event>) -> EtlResult<()> {
async fn write_events(
&self,
_events: Vec<Event>,
async_result: WriteEventsResult<()>,
) -> EtlResult<()> {
async_result.send(Ok(()));

Ok(())
}
}
Loading
Loading