Skip to content

Commit 6973bcc

Browse files
authored
Add delete cdc changes (#40)
* Add delete cdc changes * lint fix * Add queries for mysql and postgres provider * Fix duckdb query * Add tests * Add cdc delete query for sqllite provider
1 parent 36ff88a commit 6973bcc

10 files changed

+1596
-44
lines changed

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ pub mod path_resolver;
4646
pub mod schema;
4747
pub mod table;
4848
pub mod table_changes;
49+
pub mod table_deletions;
4950
pub mod table_functions;
5051
pub mod types;
5152

src/metadata_provider.rs

Lines changed: 128 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,111 @@ pub const SQL_GET_DATA_FILES_ADDED_BETWEEN_SNAPSHOTS: &str = "
8888
ORDER BY data.begin_snapshot";
8989

9090
pub const SQL_GET_DELETE_FILES_ADDED_BETWEEN_SNAPSHOTS: &str = "
91-
SELECT del.begin_snapshot
92-
FROM ducklake_delete_file AS del
93-
WHERE del.table_id = ?
94-
AND del.begin_snapshot > ?
95-
AND del.begin_snapshot <= ?
96-
ORDER BY del.begin_snapshot";
91+
WITH params AS (
92+
SELECT
93+
? AS table_identifier,
94+
? AS start_snapshot,
95+
? AS finish_snapshot
96+
),
97+
98+
current_delete AS (
99+
SELECT
100+
df.data_file_id,
101+
df.begin_snapshot,
102+
df.path,
103+
df.path_is_relative,
104+
df.file_size_bytes,
105+
df.footer_size,
106+
df.encryption_key
107+
FROM ducklake_delete_file df
108+
CROSS JOIN params p
109+
WHERE df.table_id = p.table_identifier
110+
AND df.begin_snapshot BETWEEN p.start_snapshot AND p.finish_snapshot
111+
),
112+
113+
all_deletes AS (
114+
SELECT
115+
df.data_file_id,
116+
df.begin_snapshot,
117+
df.path,
118+
df.path_is_relative,
119+
df.file_size_bytes,
120+
df.footer_size,
121+
df.encryption_key
122+
FROM ducklake_delete_file df
123+
CROSS JOIN params p
124+
WHERE df.table_id = p.table_identifier
125+
)
126+
127+
SELECT
128+
data.path,
129+
data.path_is_relative,
130+
data.file_size_bytes,
131+
data.footer_size,
132+
data.row_id_start,
133+
data.record_count,
134+
data.mapping_id,
135+
136+
cd.path AS current_delete_path,
137+
cd.path_is_relative AS current_delete_path_is_relative,
138+
cd.file_size_bytes AS current_delete_file_size_bytes,
139+
cd.footer_size AS current_delete_footer_size,
140+
141+
pd.path AS previous_delete_path,
142+
pd.path_is_relative AS previous_delete_path_is_relative,
143+
pd.file_size_bytes AS previous_delete_file_size_bytes,
144+
pd.footer_size AS previous_delete_footer_size,
145+
146+
cd.begin_snapshot
147+
FROM current_delete cd
148+
JOIN ducklake_data_file data
149+
ON data.data_file_id = cd.data_file_id
150+
LEFT JOIN LATERAL (
151+
SELECT path, path_is_relative, file_size_bytes, footer_size
152+
FROM all_deletes ad
153+
WHERE ad.data_file_id = cd.data_file_id
154+
AND ad.begin_snapshot < cd.begin_snapshot
155+
ORDER BY ad.begin_snapshot DESC
156+
LIMIT 1
157+
) pd ON true
158+
CROSS JOIN params p
159+
WHERE data.table_id = p.table_identifier
160+
161+
UNION ALL
162+
163+
SELECT
164+
data.path,
165+
data.path_is_relative,
166+
data.file_size_bytes,
167+
data.footer_size,
168+
data.row_id_start,
169+
data.record_count,
170+
data.mapping_id,
171+
172+
NULL,
173+
NULL,
174+
NULL,
175+
NULL,
176+
177+
pd.path,
178+
pd.path_is_relative,
179+
pd.file_size_bytes,
180+
pd.footer_size,
181+
182+
data.end_snapshot
183+
FROM ducklake_data_file data
184+
LEFT JOIN LATERAL (
185+
SELECT path, path_is_relative, file_size_bytes, footer_size
186+
FROM all_deletes ad
187+
WHERE ad.data_file_id = data.data_file_id
188+
AND ad.begin_snapshot < data.end_snapshot
189+
ORDER BY ad.begin_snapshot DESC
190+
LIMIT 1
191+
) pd ON true
192+
CROSS JOIN params p
193+
WHERE data.table_id = p.table_identifier
194+
AND data.end_snapshot BETWEEN p.start_snapshot AND p.finish_snapshot;
195+
";
97196

98197
// Bulk queries for information_schema (avoids N+1 query problem)
99198

@@ -325,7 +424,29 @@ pub struct DataFileChange {
325424

326425
#[derive(Debug, Clone)]
327426
pub struct DeleteFileChange {
328-
pub begin_snapshot: i64,
427+
/* -------- Data file being affected -------- */
428+
pub data_file_path: String,
429+
pub data_file_path_is_relative: bool,
430+
pub data_file_size_bytes: i64,
431+
pub data_file_footer_size: i64,
432+
pub data_row_id_start: i64,
433+
pub data_record_count: i64,
434+
pub data_mapping_id: Option<i64>,
435+
436+
/* -------- Delete file added at this snapshot (None for full file deletes) -------- */
437+
pub current_delete_path: Option<String>,
438+
pub current_delete_path_is_relative: Option<bool>,
439+
pub current_delete_file_size_bytes: Option<i64>,
440+
pub current_delete_footer_size: Option<i64>,
441+
442+
/* -------- Delete file replaced (if any) -------- */
443+
pub previous_delete_path: Option<String>,
444+
pub previous_delete_path_is_relative: Option<bool>,
445+
pub previous_delete_file_size_bytes: Option<i64>,
446+
pub previous_delete_footer_size: Option<i64>,
447+
448+
/* -------- Snapshot where change occurred -------- */
449+
pub snapshot_id: i64,
329450
}
330451

331452
pub trait MetadataProvider: Send + Sync + std::fmt::Debug {

src/metadata_provider_duckdb.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,29 @@ impl MetadataProvider for DuckdbMetadataProvider {
427427
let files = stmt
428428
.query_map(params![table_id, start_snapshot, end_snapshot], |row| {
429429
Ok(DeleteFileChange {
430-
begin_snapshot: row.get(0)?,
430+
// data file
431+
data_file_path: row.get(0)?,
432+
data_file_path_is_relative: row.get(1)?,
433+
data_file_size_bytes: row.get(2)?,
434+
data_file_footer_size: row.get(3)?,
435+
data_row_id_start: row.get(4)?,
436+
data_record_count: row.get(5)?,
437+
data_mapping_id: row.get(6)?,
438+
439+
// current delete
440+
current_delete_path: row.get(7)?,
441+
current_delete_path_is_relative: row.get(8)?,
442+
current_delete_file_size_bytes: row.get(9)?,
443+
current_delete_footer_size: row.get(10)?,
444+
445+
// previous delete
446+
previous_delete_path: row.get(11)?,
447+
previous_delete_path_is_relative: row.get(12)?,
448+
previous_delete_file_size_bytes: row.get(13)?,
449+
previous_delete_footer_size: row.get(14)?,
450+
451+
// snapshot
452+
snapshot_id: row.get(15)?,
431453
})
432454
})?
433455
.collect::<Result<Vec<_>, _>>()?;

src/metadata_provider_mysql.rs

Lines changed: 127 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -529,14 +529,112 @@ impl MetadataProvider for MySqlMetadataProvider {
529529
end_snapshot: i64,
530530
) -> Result<Vec<DeleteFileChange>> {
531531
block_on(async {
532+
// MySQL equivalent of DuckDB's SQL_GET_DELETE_FILES_ADDED_BETWEEN_SNAPSHOTS
533+
// Uses LATERAL (supported in MySQL 8.0.14+) for previous delete file lookup
532534
let rows = sqlx::query(
533-
"SELECT del.begin_snapshot
534-
FROM ducklake_delete_file AS del
535-
WHERE del.table_id = ?
536-
AND del.begin_snapshot > ?
537-
AND del.begin_snapshot <= ?
538-
ORDER BY del.begin_snapshot",
535+
r#"
536+
WITH current_delete AS (
537+
SELECT
538+
ddf.data_file_id,
539+
ddf.begin_snapshot,
540+
ddf.path,
541+
ddf.path_is_relative,
542+
ddf.file_size_bytes,
543+
ddf.footer_size,
544+
ddf.encryption_key
545+
FROM ducklake_delete_file ddf
546+
WHERE ddf.table_id = ?
547+
AND ddf.begin_snapshot > ?
548+
AND ddf.begin_snapshot <= ?
549+
),
550+
551+
data_files AS (
552+
SELECT df.*
553+
FROM ducklake_data_file df
554+
WHERE df.table_id = ?
555+
)
556+
557+
-- Part 1: Incremental deletes
558+
SELECT
559+
data.path,
560+
data.path_is_relative,
561+
data.file_size_bytes,
562+
data.footer_size,
563+
data.row_id_start,
564+
data.record_count,
565+
data.mapping_id,
566+
current_delete.path,
567+
current_delete.path_is_relative,
568+
current_delete.file_size_bytes,
569+
current_delete.footer_size,
570+
prev.path,
571+
prev.path_is_relative,
572+
prev.file_size_bytes,
573+
prev.footer_size,
574+
current_delete.begin_snapshot
575+
FROM current_delete
576+
JOIN data_files data USING (data_file_id)
577+
LEFT JOIN LATERAL (
578+
SELECT
579+
ddf.path,
580+
ddf.path_is_relative,
581+
ddf.file_size_bytes,
582+
ddf.footer_size
583+
FROM ducklake_delete_file ddf
584+
WHERE ddf.table_id = ?
585+
AND ddf.data_file_id = current_delete.data_file_id
586+
AND ddf.begin_snapshot < current_delete.begin_snapshot
587+
ORDER BY ddf.begin_snapshot DESC
588+
LIMIT 1
589+
) prev ON true
590+
591+
UNION ALL
592+
593+
-- Part 2: Full file deletes
594+
SELECT
595+
data.path,
596+
data.path_is_relative,
597+
data.file_size_bytes,
598+
data.footer_size,
599+
data.row_id_start,
600+
data.record_count,
601+
data.mapping_id,
602+
NULL,
603+
NULL,
604+
NULL,
605+
NULL,
606+
prev.path,
607+
prev.path_is_relative,
608+
prev.file_size_bytes,
609+
prev.footer_size,
610+
data.end_snapshot
611+
FROM ducklake_data_file data
612+
LEFT JOIN LATERAL (
613+
SELECT
614+
ddf.path,
615+
ddf.path_is_relative,
616+
ddf.file_size_bytes,
617+
ddf.footer_size
618+
FROM ducklake_delete_file ddf
619+
WHERE ddf.table_id = ?
620+
AND ddf.data_file_id = data.data_file_id
621+
AND ddf.begin_snapshot < data.end_snapshot
622+
ORDER BY ddf.begin_snapshot DESC
623+
LIMIT 1
624+
) prev ON true
625+
WHERE data.table_id = ?
626+
AND data.end_snapshot > ?
627+
AND data.end_snapshot <= ?
628+
"#,
539629
)
630+
// Part 1 bindings: table_id (current_delete), start_snapshot, end_snapshot, table_id (data_files), table_id (prev lateral)
631+
.bind(table_id)
632+
.bind(start_snapshot)
633+
.bind(end_snapshot)
634+
.bind(table_id)
635+
.bind(table_id)
636+
// Part 2 bindings: table_id (prev lateral), table_id (data), start_snapshot, end_snapshot
637+
.bind(table_id)
540638
.bind(table_id)
541639
.bind(start_snapshot)
542640
.bind(end_snapshot)
@@ -546,7 +644,29 @@ impl MetadataProvider for MySqlMetadataProvider {
546644
rows.into_iter()
547645
.map(|row| {
548646
Ok(DeleteFileChange {
549-
begin_snapshot: row.try_get(0)?,
647+
// data file
648+
data_file_path: row.try_get(0)?,
649+
data_file_path_is_relative: row.try_get(1)?,
650+
data_file_size_bytes: row.try_get(2)?,
651+
data_file_footer_size: row.try_get(3)?,
652+
data_row_id_start: row.try_get(4)?,
653+
data_record_count: row.try_get(5)?,
654+
data_mapping_id: row.try_get(6)?,
655+
656+
// current delete
657+
current_delete_path: row.try_get(7)?,
658+
current_delete_path_is_relative: row.try_get(8)?,
659+
current_delete_file_size_bytes: row.try_get(9)?,
660+
current_delete_footer_size: row.try_get(10)?,
661+
662+
// previous delete
663+
previous_delete_path: row.try_get(11)?,
664+
previous_delete_path_is_relative: row.try_get(12)?,
665+
previous_delete_file_size_bytes: row.try_get(13)?,
666+
previous_delete_footer_size: row.try_get(14)?,
667+
668+
// snapshot
669+
snapshot_id: row.try_get(15)?,
550670
})
551671
})
552672
.collect()

0 commit comments

Comments
 (0)