Skip to content

Commit 79907fd

Browse files
committed
fix: spaced columns parsing
Signed-off-by: Ion Koutsouris <[email protected]>
1 parent 1ec381d commit 79907fd

File tree

4 files changed

+184
-3
lines changed

4 files changed

+184
-3
lines changed

crates/core/src/delta_datafusion/expr.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,10 @@ impl Display for BinaryExprFormat<'_> {
333333
impl Display for SqlFormat<'_> {
334334
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
335335
match self.expr {
336-
Expr::Column(c) => write!(f, "{c}"),
336+
Expr::Column(c) => {
337+
let c = c.quoted_flat_name();
338+
write!(f, "{c}")
339+
}
337340
Expr::Literal(v) => write!(f, "{}", ScalarValueFormat { scalar: v }),
338341
Expr::Case(case) => {
339342
write!(f, "CASE ")?;

crates/core/src/delta_datafusion/mod.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1591,7 +1591,7 @@ impl DeltaDataChecker {
15911591
return Ok(());
15921592
}
15931593
let table = MemTable::try_new(record_batch.schema(), vec![vec![record_batch.clone()]])?;
1594-
1594+
table.schema();
15951595
// Use a random table name to avoid clashes when running multiple parallel tasks, e.g. when using a partitioned table
15961596
let table_name: String = uuid::Uuid::new_v4().to_string();
15971597
self.ctx.register_table(&table_name, Arc::new(table))?;
@@ -2408,6 +2408,58 @@ mod tests {
24082408
assert!(result.is_err());
24092409
}
24102410

2411+
/// Ensure that constraints when there are spaces in the field name still work
2412+
///
2413+
/// See <https://github.com/delta-io/delta-rs/pull/3374>
2414+
#[tokio::test]
2415+
async fn test_constraints_with_spacey_fields() -> DeltaResult<()> {
2416+
let schema = Arc::new(Schema::new(vec![
2417+
Field::new("a", ArrowDataType::Utf8, false),
2418+
Field::new("b bop", ArrowDataType::Int32, false),
2419+
]));
2420+
let batch = RecordBatch::try_new(
2421+
Arc::clone(&schema),
2422+
vec![
2423+
Arc::new(arrow::array::StringArray::from(vec![
2424+
"a", "b bop", "c", "d",
2425+
])),
2426+
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
2427+
],
2428+
)?;
2429+
2430+
// Valid invariants return Ok(())
2431+
let constraints = vec![
2432+
Constraint::new("custom a", "a is not null"),
2433+
Constraint::new("custom_b", "`b bop` < 1000"),
2434+
];
2435+
assert!(DeltaDataChecker::new_with_constraints(constraints)
2436+
.check_batch(&batch)
2437+
.await
2438+
.is_ok());
2439+
2440+
// Violated invariants returns an error with list of violations
2441+
let constraints = vec![
2442+
Constraint::new("custom_a", "a is null"),
2443+
Constraint::new("custom_B", "\"b bop\" < 100"),
2444+
];
2445+
let result = DeltaDataChecker::new_with_constraints(constraints)
2446+
.check_batch(&batch)
2447+
.await;
2448+
assert!(result.is_err());
2449+
assert!(matches!(result, Err(DeltaTableError::InvalidData { .. })));
2450+
if let Err(DeltaTableError::InvalidData { violations }) = result {
2451+
assert_eq!(violations.len(), 2);
2452+
}
2453+
2454+
// Irrelevant constraints return a different error
2455+
let constraints = vec![Constraint::new("custom_c", "c > 2000")];
2456+
let result = DeltaDataChecker::new_with_constraints(constraints)
2457+
.check_batch(&batch)
2458+
.await;
2459+
assert!(result.is_err());
2460+
Ok(())
2461+
}
2462+
24112463
#[test]
24122464
fn roundtrip_test_delta_exec_plan() {
24132465
let ctx = SessionContext::new();

crates/core/src/operations/write/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,6 @@ impl std::future::IntoFuture for WriteBuilder {
593593
Expression::String(s) => {
594594
let df_schema = DFSchema::try_from(schema.as_ref().to_owned())?;
595595
parse_predicate_expression(&df_schema, s, &state)?
596-
// this.snapshot.unwrap().parse_predicate_expression(s, &state)?
597596
}
598597
};
599598
(Some(fmt_expr_to_sql(&pred)?), Some(pred))

python/tests/test_constraint.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import pytest
2+
from arro3.core import Array, DataType, Field, Schema, Table
3+
4+
from deltalake import DeltaTable, write_deltalake
5+
from deltalake.exceptions import DeltaError, DeltaProtocolError
6+
7+
8+
@pytest.fixture()
9+
def sample_table() -> Table:
10+
nrows = 5
11+
return Table(
12+
{
13+
"id": Array(
14+
["1", "2", "3", "4", "5"],
15+
Field("id", type=DataType.string(), nullable=True),
16+
),
17+
"high price": Array(
18+
list(range(nrows)),
19+
Field("high price", type=DataType.int64(), nullable=True),
20+
),
21+
},
22+
)
23+
24+
25+
def test_not_corrupting_expression(tmp_path):
26+
data = Table.from_pydict(
27+
{
28+
"b": Array([1], DataType.int64()),
29+
"color_column": Array(["red"], DataType.string()),
30+
},
31+
)
32+
33+
data2 = Table.from_pydict(
34+
{
35+
"b": Array([1], DataType.int64()),
36+
"color_column": Array(["blue"], DataType.string()),
37+
},
38+
)
39+
40+
write_deltalake(
41+
tmp_path,
42+
data,
43+
mode="overwrite",
44+
partition_by=["color_column"],
45+
predicate="color_column = 'red'",
46+
)
47+
write_deltalake(
48+
tmp_path,
49+
data2,
50+
mode="overwrite",
51+
partition_by=["color_column"],
52+
predicate="color_column = 'blue'",
53+
)
54+
55+
56+
def test_not_corrupting_expression_columns_spaced(tmp_path):
57+
data = Table.from_pydict(
58+
{
59+
"b": Array([1], DataType.int64()),
60+
"color column": Array(["red"], DataType.string()),
61+
},
62+
)
63+
64+
data2 = Table.from_pydict(
65+
{
66+
"b": Array([1], DataType.int64()),
67+
"color column": Array(["blue"], DataType.string()),
68+
},
69+
)
70+
71+
write_deltalake(
72+
tmp_path,
73+
data,
74+
mode="overwrite",
75+
# partition_by=["color column"],
76+
predicate="`color column` = 'red'",
77+
)
78+
write_deltalake(
79+
tmp_path,
80+
data2,
81+
mode="overwrite",
82+
# partition_by=["color column"],
83+
predicate="`color column` = 'blue'",
84+
)
85+
86+
87+
# fmt: off
88+
89+
@pytest.mark.parametrize("sql_string", [
90+
"`high price` >= 0",
91+
'"high price" >= 0',
92+
"\"high price\" >= 0"
93+
])
94+
def test_add_constraint(tmp_path, sample_table: Table, sql_string: str):
95+
write_deltalake(tmp_path, sample_table)
96+
97+
dt = DeltaTable(tmp_path)
98+
99+
dt.alter.add_constraint({"check_price": sql_string})
100+
101+
last_action = dt.history(1)[0]
102+
assert last_action["operation"] == "ADD CONSTRAINT"
103+
assert dt.version() == 1
104+
assert dt.metadata().configuration == {
105+
"delta.constraints.check_price": '"high price" >= 0'
106+
}
107+
assert dt.protocol().min_writer_version == 3
108+
109+
with pytest.raises(DeltaError):
110+
# Invalid constraint
111+
dt.alter.add_constraint({"check_price": '"high price" < 0'})
112+
113+
with pytest.raises(DeltaProtocolError):
114+
data = Table(
115+
{
116+
"id": Array(["1"], DataType.string()),
117+
"high price": Array([-1], DataType.int64()),
118+
},
119+
schema=Schema(
120+
fields=[
121+
Field("id", type=DataType.string(), nullable=True),
122+
Field("high price", type=DataType.int64(), nullable=True),
123+
]
124+
),
125+
)
126+
127+
write_deltalake(tmp_path, data, mode="append")

0 commit comments

Comments
 (0)