|
| 1 | +# Delta Lake Destination — Implementation Plan |
| 2 | + |
| 3 | +## Goals |
| 4 | +- Implement a `DeltaDestination` that satisfies `etl::destination::Destination` for: |
| 5 | + - Initial table sync (`truncate_table`, `write_table_rows`) |
| 6 | + - CDC (`write_events`: inserts, updates, deletes, truncates) |
| 7 | +- Preserve correctness and ordering (LSN last-wins) with durable, atomic commits. |
| 8 | +- Avoid micro-batches with sensible batching and file sizing; support compaction. |
| 9 | +- Support schema evolution (additive) and configurable partitioning. |
| 10 | +- Keep idempotency and crash-safety consistent with current pipeline semantics. |
| 11 | + |
| 12 | +## Scope and Non-Goals |
| 13 | +- Scope: Write-path only (append/merge/delete to a Delta table), optional compaction. |
| 14 | +- Non-goals: Reader/query engine; complex schema rewrites (rename/drop); cross-table transactions. |
| 15 | + |
| 16 | +## Architecture |
| 17 | + |
| 18 | +- New module: `etl-destinations/src/delta/` |
| 19 | + - `mod.rs`: re-exports |
| 20 | + - `core.rs`: `DeltaDestination<S>` implementation of `Destination` |
| 21 | + - `client.rs`: thin wrapper over `delta-rs` ops and object store setup |
| 22 | + - `schema.rs`: mapping from `etl::types::TableSchema` to Delta/Arrow schema |
| 23 | + - `encoding.rs`: `TableRow`/`Cell` → Arrow arrays/RecordBatch |
| 24 | + - `validation.rs`: config validation (paths, partitions) |
| 25 | +- Dependencies (feature-gated, e.g. `delta`): |
| 26 | + - `delta` (delta-rs), `arrow` (aligned with delta-rs), `object_store`, `parquet`, `bytes`, `serde_json` |
| 27 | +- Config additions in `etl-config`: |
| 28 | + - `DestinationConfig::Delta { base_uri, warehouse: Option<String>, table_prefix: Option<String>, partition_columns: Option<Vec<String>>, max_concurrent_writes: usize, target_file_size_mb: usize, enable_cdf: bool, optimize_after_commits: Option<u64> }` |
| 29 | +- Wiring in `etl-replicator/src/core.rs`: |
| 30 | + - Add `DestinationConfig::Delta` arm that constructs `DeltaDestination` with `StateStore + SchemaStore` |
| 31 | + |
| 32 | +## Semantics |
| 33 | + |
| 34 | +- Table naming and mapping |
| 35 | + - Destination table path per source `TableId`: |
| 36 | + - `<base_uri>/<table_prefix>/<schema>__<table>` (escape `_` as needed; mirror BigQuery naming rule) |
| 37 | + - Persist mapping in `SchemaStore/StateStore` (`table_mappings`) to remain stable across restarts. |
| 38 | + |
| 39 | +- Schema mapping |
| 40 | + - `TableSchema` → Arrow/Delta schema: |
| 41 | + - Scalars: bool/int/float/text/timestamp/uuid/json → Arrow equivalents |
| 42 | + - Numeric/decimal: if precision/scale unknown, map to string (match current practice) |
| 43 | + - Arrays: Arrow List |
| 44 | + - Always include PK columns (from source schema metadata) |
| 45 | + - For additive changes: `ALTER TABLE ADD COLUMN` (nullable). Do not drop/rename in v1. |
| 46 | + |
| 47 | +- Initial table sync |
| 48 | + - `truncate_table`: |
| 49 | + - Prefer atomic empty snapshot: commit with “remove all” (delete predicate `true`) or recreate table version (depending on delta-rs capability). |
| 50 | + - `write_table_rows(table_id, rows)`: |
| 51 | + - Convert to Arrow `RecordBatch` in chunks sized to `target_file_size_mb`. |
| 52 | + - Use delta-rs writer with `append` mode. Optional partitioning by configured columns. |
| 53 | + - Parallelize per-table if caller invokes concurrently; respect `max_concurrent_writes` for internal splits. |
| 54 | + |
| 55 | +- CDC (`write_events`) |
| 56 | + - Group events by `table_id`. For each table: |
| 57 | + - Build three in-memory sets from the batch: |
| 58 | + - Upserts (Insert + Update): last-wins by PK using LSN order within the batch. |
| 59 | + - Deletes: by PK. |
| 60 | + - Track affected PK set = upsert_keys ∪ delete_keys. |
| 61 | + - Transactional commit: |
| 62 | + 1) Delete all rows with PK IN affected set (Delta delete predicate; for composite PK, build predicate disjunction or write a temporary helper file + merge path). |
| 63 | + 2) Append deduped upsert rows as new files. |
| 64 | + - Ordering/idempotency: |
| 65 | + - Last-wins inside batch via LSN; across batches the pipeline guarantees ordered delivery and only advances LSN after successful commit. |
| 66 | + - Optional: use Delta `txn` app-level id for extra dedupe safety with `appId="etl-<pipeline>-<table>"` and a monotonic version (e.g., per-table sequence stored in `StateStore`). |
| 67 | + - Truncate events: |
| 68 | + - Handle same as `truncate_table` and continue. |
| 69 | + |
| 70 | +- Partitioning |
| 71 | + - Default: no partitioning. |
| 72 | + - Optional per-table partition columns from config (validate existence and low cardinality). |
| 73 | + - Warn if PK chosen as partition key (can cause skew and small files). |
| 74 | + |
| 75 | +- Micro-batch mitigation and file sizing |
| 76 | + - Accumulate rows into writer-side batches to target ~ `target_file_size_mb` (e.g., 128–256MB). |
| 77 | + - During low throughput, still flush on `batch.max_fill_ms` from pipeline to prevent latency, but coalesce inside destination before closing Parquet files when possible. |
| 78 | + - Optional background compaction: run `OPTIMIZE` (small-file coalescing) every N commits if `optimize_after_commits` is set. |
| 79 | + |
| 80 | +- Schema evolution during CDC |
| 81 | + - On schema cache change (from `SchemaStore`), reconcile: |
| 82 | + - Add missing columns as nullable in Delta. |
| 83 | + - Fill absent values as null/default on write. |
| 84 | + |
| 85 | +- Error handling and retries |
| 86 | + - Destination writes are idempotent given delete-then-append per affected PK set. |
| 87 | + - Commit failures: no LSN advance; batch will be retried; repeat delete+append results in same final state. |
| 88 | + - Surface structured errors; include commit metrics. |
| 89 | + |
| 90 | +- Metrics |
| 91 | + - Counters: rows written, deletes applied, commits, optimized files. |
| 92 | + - Gauges: file sizes, rows per file, commit duration. |
| 93 | + - Logs: per-table commit stats; bytes written. |
| 94 | + |
| 95 | +## Pseudocode |
| 96 | + |
| 97 | +- write_table_rows |
| 98 | +```rust |
| 99 | +fn write_table_rows(table_id, table_rows): |
| 100 | + ensure_table_exists_and_schema(table_id) |
| 101 | + batches = chunk_into_record_batches(table_rows, target_file_size_mb) |
| 102 | + for batch in batches: |
| 103 | + delta_ops(table_id).append(batch).await |
| 104 | +``` |
| 105 | + |
| 106 | +- write_events |
| 107 | +```rust |
| 108 | +fn write_events(events): |
| 109 | + events_by_table = group_by_table(events) |
| 110 | + |
| 111 | + for (table_id, evs) in events_by_table: |
| 112 | + ensure_table_exists_and_schema(table_id) |
| 113 | + |
| 114 | + // Deduplicate by PK with last-wins using (commit_lsn, start_lsn) |
| 115 | + upserts_by_pk = HashMap<PK, Row> |
| 116 | + delete_pks = HashSet<PK> |
| 117 | + for e in evs.in_order(): |
| 118 | + match e { |
| 119 | + Insert|Update => upserts_by_pk.insert(pk(e.row), e.row) // overwrite last |
| 120 | + Delete => { upserts_by_pk.remove(&pk(e)); delete_pks.insert(pk(e)); } |
| 121 | + Truncate => handle_truncate(table_id) |
| 122 | + } |
| 123 | + |
| 124 | + affected_pks = union(keys(upserts_by_pk), delete_pks) |
| 125 | + |
| 126 | + // One transaction: delete + append |
| 127 | + begin_tx(table_id, app_id, maybe_txn_version) |
| 128 | + if !affected_pks.is_empty(): |
| 129 | + delete_where_pk_in(affected_pks) |
| 130 | + if !upserts_by_pk.is_empty(): |
| 131 | + record_batches = chunk_into_record_batches(values(upserts_by_pk), target_file_size_mb) |
| 132 | + for rb in record_batches: |
| 133 | + append(rb) |
| 134 | + commit_tx() |
| 135 | +``` |
| 136 | + |
| 137 | +## Integration Points |
| 138 | + |
| 139 | +- `etl-config`: |
| 140 | + - Add `Delta` variant in `DestinationConfig` + serde. |
| 141 | + - Validation for URIs and partition columns. |
| 142 | + |
| 143 | +- `etl-replicator/src/core.rs`: |
| 144 | + - Handle `DestinationConfig::Delta` creation. |
| 145 | + |
| 146 | +- `etl-destinations`: |
| 147 | + - New `delta` module; feature flag; export `DeltaDestination`. |
| 148 | + |
| 149 | +- `etl`: |
| 150 | + - Reuse existing batching; no changes required. |
| 151 | + - Optional: a small helper to get PK metadata from `TableSchema` if not already exposed. |
| 152 | + |
| 153 | +## Testing Plan |
| 154 | + |
| 155 | +- Unit tests (file:// local object store): |
| 156 | + - Create table; append rows; verify snapshot row count. |
| 157 | + - CDC last-wins semantics within a batch; across multiple batches. |
| 158 | + - Deletes only; upserts+deletes on same key. |
| 159 | + - Truncate behavior. |
| 160 | + - Schema add column: write with/without new column present. |
| 161 | + |
| 162 | +- Integration tests: |
| 163 | + - Run pipeline with memory source into Delta file store; verify final state. |
| 164 | + - Idempotency: inject failure after write, before LSN advance; rerun. |
| 165 | + |
| 166 | +- Performance/sizing tests: |
| 167 | + - Validate file sizes approach target. |
| 168 | + - Validate compaction reduces small files. |
| 169 | + |
| 170 | +## Milestones |
| 171 | + |
| 172 | +- M1: Scaffolding and config; create/append; initial sync end-to-end. |
| 173 | +- M2: CDC write path (delete+append), last-wins, idempotency; truncate. |
| 174 | +- M3: Schema evolution (add column), partitioning, metrics. |
| 175 | +- M4: Compaction/OPTIMIZE, tuning, docs, examples. |
| 176 | + |
| 177 | +## Risks and Mitigations |
| 178 | + |
| 179 | +- Delete predicate scalability for large PK sets: |
| 180 | + - Mitigate by chunked delete predicates or temporary helper table + merge (if delta-rs supports). |
| 181 | +- Delta merge support maturity: |
| 182 | + - Start with delete+append; add merge path when stable. |
| 183 | +- Small files during low throughput: |
| 184 | + - Larger writer buffers; periodic compaction; configurable flush. |
| 185 | +- Schema drift: |
| 186 | + - Additive only in v1; strict validation and logging on incompatible changes. |
| 187 | + |
| 188 | +## Documentation |
| 189 | + |
| 190 | +- Add `docs/tutorials` entry for Delta destination setup and configuration. |
| 191 | +- Example configs with S3/GCS/Azure using `object_store` env/creds. |
0 commit comments