Skip to content

Commit ab2bbdd

Browse files
authored
Make sure that column types are not inferred to VARCHAR (#1006)
The `UNION BY NAME` approach introduced as a fix for issue #977 led to the problem that if one of the files contained only empty values for an optional column, that column would be inferred to be of type `VARCHAR` by DuckDB. The solution is to create the empty table with strict types based on the data contract and then insert the data into the table in a separate step, taking only into account those columns that exist in both the data contract and the data, which might be missing newly introduced columns or still have columns that have been removed from the contract by now.
1 parent 585ee42 commit ab2bbdd

File tree

3 files changed

+16
-17
lines changed

3 files changed

+16
-17
lines changed

datacontract/engines/soda/connections/duckdb_connection.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,21 @@ def create_view_with_schema_union(con, schema_obj: SchemaObject, model_path: str
7878
if converted_types:
7979
# Create empty table with contract schema
8080
columns_def = [f'"{col_name}" {col_type}' for col_name, col_type in converted_types.items()]
81-
create_empty_table = f"""CREATE TABLE "{model_name}_schema" ({', '.join(columns_def)});"""
81+
create_empty_table = f"""CREATE TABLE "{model_name}" ({', '.join(columns_def)});"""
8282
con.sql(create_empty_table)
8383

84-
# Create view as UNION of empty schema table and data
85-
create_view_sql = f"""CREATE VIEW "{model_name}" AS
86-
SELECT * FROM "{model_name}_schema"
87-
UNION ALL BY NAME
88-
SELECT * FROM {read_function}('{model_path}', union_by_name=true, hive_partitioning=1);"""
89-
con.sql(create_view_sql)
84+
# Read columns existing in both current data contract and data
85+
intersecting_columns = con.sql(f"""SELECT column_name
86+
FROM (DESCRIBE SELECT * FROM {read_function}('{model_path}', union_by_name=true, hive_partitioning=1))
87+
INTERSECT SELECT column_name
88+
FROM information_schema.columns
89+
WHERE table_name = '{model_name}'""").fetchall()
90+
selected_columns = ', '.join([column[0] for column in intersecting_columns])
91+
92+
# Insert data into table by name, but only columns existing in contract and data
93+
insert_data_sql = f"""INSERT INTO {model_name} BY NAME
94+
(SELECT {selected_columns} FROM {read_function}('{model_path}', union_by_name=true, hive_partitioning=1));"""
95+
con.sql(insert_data_sql)
9096
else:
9197
# Fallback
9298
con.sql(
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
city_id,name,population
2+
c5193fd1-6b9b-4abc-81c5-ddb30a52d8f6,Babylon,
3+
cdcc7194-8f4d-40a8-84bb-92b32e0fb684,Tokyo,

tests/test_test_schema_evolution.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ def test_csv_optional_field_missing_from_old_data():
99

1010
run = data_contract.test()
1111

12-
print(run)
1312
assert run.result == "passed"
1413
assert all(check.result == "passed" for check in run.checks)
1514

@@ -21,7 +20,6 @@ def test_csv_optional_field_present_in_new_data():
2120

2221
run = data_contract.test()
2322

24-
print(run)
2523
assert run.result == "passed"
2624
assert all(check.result == "passed" for check in run.checks)
2725

@@ -32,7 +30,6 @@ def test_data_from_historical_and_current_schema_csv_mixed():
3230

3331
run = data_contract.test()
3432

35-
print(run)
3633
assert run.result == "passed"
3734
assert all(check.result == "passed" for check in run.checks)
3835

@@ -43,7 +40,6 @@ def test_csv_optional_field_with_invalid_values():
4340

4441
run = data_contract.test()
4542

46-
print(run)
4743
assert run.result == "failed"
4844
# Should have at least one failed check for constraint violation
4945
assert any(check.result == "failed" for check in run.checks)
@@ -55,7 +51,6 @@ def test_csv_required_field_missing_fails():
5551

5652
run = data_contract.test()
5753

58-
print(run)
5954
assert run.result == "failed"
6055

6156

@@ -70,7 +65,6 @@ def test_parquet_optional_field_missing_from_old_data():
7065

7166
run = data_contract.test()
7267

73-
print(run)
7468
assert run.result == "passed"
7569
assert all(check.result == "passed" for check in run.checks)
7670

@@ -82,7 +76,6 @@ def test_parquet_optional_field_present_in_new_data():
8276

8377
run = data_contract.test()
8478

85-
print(run)
8679
assert run.result == "passed"
8780
assert all(check.result == "passed" for check in run.checks)
8881

@@ -93,7 +86,6 @@ def test_data_from_historical_and_current_schema_parquet_mixed():
9386

9487
run = data_contract.test()
9588

96-
print(run)
9789
assert run.result == "passed"
9890
assert all(check.result == "passed" for check in run.checks)
9991

@@ -104,7 +96,6 @@ def test_parquet_optional_field_with_invalid_values():
10496

10597
run = data_contract.test()
10698

107-
print(run)
10899
assert run.result == "failed"
109100
# Should have at least one failed check for constraint violation
110101
assert any(check.result == "failed" for check in run.checks)
@@ -116,5 +107,4 @@ def test_parquet_required_field_missing_fails():
116107

117108
run = data_contract.test()
118109

119-
print(run)
120110
assert run.result == "failed"

0 commit comments

Comments
 (0)