Skip to content

Commit aa71dd2

Browse files
committed
fix: make before and after schema in same order
1 parent 18cc561 commit aa71dd2

File tree

2 files changed

+55
-5
lines changed

2 files changed

+55
-5
lines changed

crates/core/src/operations/merge/mod.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1154,6 +1154,14 @@ async fn execute(
11541154
.select(write_projection.clone())?
11551155
.with_column(CDC_COLUMN_NAME, lit("insert"))?,
11561156
);
1157+
1158+
let after = cdc_projection
1159+
.clone()
1160+
.filter(col(TARGET_COLUMN).is_true())?
1161+
.select(write_projection.clone())?;
1162+
1163+
// Extra select_columns is required so that before and after have same schema order
1164+
// DataFusion doesn't have UnionByName yet, see https://github.com/apache/datafusion/issues/12650
11571165
let before = cdc_projection
11581166
.clone()
11591167
.filter(col(crate::delta_datafusion::PATH_COLUMN).is_not_null())?
@@ -1164,13 +1172,16 @@ async fn execute(
11641172
.filter(|c| c.name != crate::delta_datafusion::PATH_COLUMN)
11651173
.map(|c| Expr::Column(c.clone()))
11661174
.collect_vec(),
1175+
)?
1176+
.select_columns(
1177+
&after
1178+
.schema()
1179+
.columns()
1180+
.iter()
1181+
.map(|v| v.name())
1182+
.collect::<Vec<_>>(),
11671183
)?;
11681184

1169-
let after = cdc_projection
1170-
.clone()
1171-
.filter(col(TARGET_COLUMN).is_true())?
1172-
.select(write_projection.clone())?;
1173-
11741185
let tracker = CDCTracker::new(before, after);
11751186
change_data.push(tracker.collect()?);
11761187
}

python/tests/test_merge.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import datetime
2+
import os
13
import pathlib
24

35
import pyarrow as pa
@@ -1038,3 +1040,40 @@ def test_merge_isin_partition_pruning(
10381040
assert result == expected
10391041
assert metrics["num_target_files_scanned"] == 2
10401042
assert metrics["num_target_files_skipped_during_scan"] == 3
1043+
1044+
1045+
def test_cdc_merge_planning_union_2908(tmp_path):
1046+
"""https://github.com/delta-io/delta-rs/issues/2908"""
1047+
cdc_path = f"{tmp_path}/_change_data"
1048+
1049+
data = {
1050+
"id": pa.array([1, 2], pa.int64()),
1051+
"date": pa.array(
1052+
[datetime.date(1970, 1, 1), datetime.date(1970, 1, 2)], pa.date32()
1053+
),
1054+
}
1055+
1056+
table = pa.Table.from_pydict(data)
1057+
1058+
dt = DeltaTable.create(
1059+
table_uri=tmp_path,
1060+
schema=table.schema,
1061+
mode="overwrite",
1062+
partition_by=["id"],
1063+
configuration={
1064+
"delta.enableChangeDataFeed": "true",
1065+
},
1066+
)
1067+
1068+
dt.merge(
1069+
source=table,
1070+
predicate="s.id = t.id",
1071+
source_alias="s",
1072+
target_alias="t",
1073+
).when_not_matched_insert_all().execute()
1074+
1075+
last_action = dt.history(1)[0]
1076+
1077+
assert last_action["operation"] == "MERGE"
1078+
assert dt.version() == 1
1079+
assert os.path.exists(cdc_path), "_change_data doesn't exist"

0 commit comments

Comments
 (0)