Skip to content

Commit 97399bf

Browse files
committed
Merge branch 'main' into main
2 parents ebcff2b + 4d4714a commit 97399bf

File tree

13 files changed

+454
-106
lines changed

13 files changed

+454
-106
lines changed

poetry.lock

Lines changed: 190 additions & 59 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/expressions/__init__.py

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
from __future__ import annotations
1919

2020
from abc import ABC, abstractmethod
21-
from functools import cached_property, reduce
21+
from functools import cached_property
2222
from typing import (
2323
Any,
24+
Callable,
2425
Generic,
2526
Iterable,
27+
Sequence,
2628
Set,
2729
Tuple,
2830
Type,
@@ -79,6 +81,45 @@ def __or__(self, other: BooleanExpression) -> BooleanExpression:
7981
return Or(self, other)
8082

8183

84+
def _build_balanced_tree(
85+
operator_: Callable[[BooleanExpression, BooleanExpression], BooleanExpression], items: Sequence[BooleanExpression]
86+
) -> BooleanExpression:
87+
"""
88+
Recursively constructs a balanced binary tree of BooleanExpressions using the provided binary operator.
89+
90+
This function is a safer and more scalable alternative to:
91+
reduce(operator_, items)
92+
93+
Using `reduce` creates a deeply nested, unbalanced tree (e.g., operator_(a, operator_(b, operator_(c, ...)))),
94+
which grows linearly with the number of items. This can lead to RecursionError exceptions in Python
95+
when the number of expressions is large (e.g., >1000).
96+
97+
In contrast, this function builds a balanced binary tree with logarithmic depth (O(log n)),
98+
helping avoid recursion issues and ensuring that expression trees remain stable, predictable,
99+
and safe to traverse — especially in tools like PyIceberg that operate on large logical trees.
100+
101+
Parameters:
102+
operator_ (Callable): A binary operator function (e.g., pyiceberg.expressions.Or, And) that takes two
103+
BooleanExpressions and returns a combined BooleanExpression.
104+
items (Sequence[BooleanExpression]): A sequence of BooleanExpression objects to combine.
105+
106+
Returns:
107+
BooleanExpression: The balanced combination of all input BooleanExpressions.
108+
109+
Raises:
110+
ValueError: If the input sequence is empty.
111+
"""
112+
if not items:
113+
raise ValueError("No expressions to combine")
114+
if len(items) == 1:
115+
return items[0]
116+
mid = len(items) // 2
117+
118+
left = _build_balanced_tree(operator_, items[:mid])
119+
right = _build_balanced_tree(operator_, items[mid:])
120+
return operator_(left, right)
121+
122+
82123
class Term(Generic[L], ABC):
83124
"""A simple expression that evaluates to a value."""
84125

@@ -214,7 +255,7 @@ class And(BooleanExpression):
214255

215256
def __new__(cls, left: BooleanExpression, right: BooleanExpression, *rest: BooleanExpression) -> BooleanExpression: # type: ignore
216257
if rest:
217-
return reduce(And, (left, right, *rest))
258+
return _build_balanced_tree(And, (left, right, *rest))
218259
if left is AlwaysFalse() or right is AlwaysFalse():
219260
return AlwaysFalse()
220261
elif left is AlwaysTrue():
@@ -257,7 +298,7 @@ class Or(BooleanExpression):
257298

258299
def __new__(cls, left: BooleanExpression, right: BooleanExpression, *rest: BooleanExpression) -> BooleanExpression: # type: ignore
259300
if rest:
260-
return reduce(Or, (left, right, *rest))
301+
return _build_balanced_tree(Or, (left, right, *rest))
261302
if left is AlwaysTrue() or right is AlwaysTrue():
262303
return AlwaysTrue()
263304
elif left is AlwaysFalse():

pyiceberg/io/pyarrow.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@
175175
from pyiceberg.utils.concurrent import ExecutorFactory
176176
from pyiceberg.utils.config import Config
177177
from pyiceberg.utils.datetime import millis_to_datetime
178+
from pyiceberg.utils.decimal import unscaled_to_decimal
178179
from pyiceberg.utils.deprecated import deprecation_message
179180
from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int
180181
from pyiceberg.utils.singleton import Singleton
@@ -1776,7 +1777,7 @@ def struct(
17761777
field_arrays.append(array)
17771778
fields.append(self._construct_field(field, array.type))
17781779
elif field.optional:
1779-
arrow_type = schema_to_pyarrow(field.field_type, include_field_ids=False)
1780+
arrow_type = schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)
17801781
field_arrays.append(pa.nulls(len(struct_array), type=arrow_type))
17811782
fields.append(self._construct_field(field, arrow_type))
17821783
else:
@@ -1888,7 +1889,7 @@ def visit_fixed(self, fixed_type: FixedType) -> str:
18881889
return "FIXED_LEN_BYTE_ARRAY"
18891890

18901891
def visit_decimal(self, decimal_type: DecimalType) -> str:
1891-
return "FIXED_LEN_BYTE_ARRAY"
1892+
return "INT32" if decimal_type.precision <= 9 else "INT64" if decimal_type.precision <= 18 else "FIXED_LEN_BYTE_ARRAY"
18921893

18931894
def visit_boolean(self, boolean_type: BooleanType) -> str:
18941895
return "BOOLEAN"
@@ -2362,8 +2363,13 @@ def data_file_statistics_from_parquet_metadata(
23622363
stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length
23632364
)
23642365

2365-
col_aggs[field_id].update_min(statistics.min)
2366-
col_aggs[field_id].update_max(statistics.max)
2366+
if isinstance(stats_col.iceberg_type, DecimalType) and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY":
2367+
scale = stats_col.iceberg_type.scale
2368+
col_aggs[field_id].update_min(unscaled_to_decimal(statistics.min_raw, scale))
2369+
col_aggs[field_id].update_max(unscaled_to_decimal(statistics.max_raw, scale))
2370+
else:
2371+
col_aggs[field_id].update_min(statistics.min)
2372+
col_aggs[field_id].update_max(statistics.max)
23672373

23682374
except pyarrow.lib.ArrowNotImplementedError as e:
23692375
invalidate_col.add(field_id)

pyiceberg/table/upsert_util.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
BooleanExpression,
2727
EqualTo,
2828
In,
29+
Or,
2930
)
3031

3132

@@ -39,7 +40,12 @@ def create_match_filter(df: pyarrow_table, join_cols: list[str]) -> BooleanExpre
3940
functools.reduce(operator.and_, [EqualTo(col, row[col]) for col in join_cols]) for row in unique_keys.to_pylist()
4041
]
4142

42-
return AlwaysFalse() if len(filters) == 0 else functools.reduce(operator.or_, filters)
43+
if len(filters) == 0:
44+
return AlwaysFalse()
45+
elif len(filters) == 1:
46+
return filters[0]
47+
else:
48+
return Or(*filters)
4349

4450

4551
def has_duplicate_rows(df: pyarrow_table, join_cols: list[str]) -> bool:
@@ -65,7 +71,16 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols
6571
# When the target table is empty, there is nothing to update :)
6672
return source_table.schema.empty_table()
6773

68-
diff_expr = functools.reduce(operator.or_, [pc.field(f"{col}-lhs") != pc.field(f"{col}-rhs") for col in non_key_cols])
74+
diff_expr = functools.reduce(
75+
operator.or_,
76+
[
77+
pc.or_kleene(
78+
pc.not_equal(pc.field(f"{col}-lhs"), pc.field(f"{col}-rhs")),
79+
pc.is_null(pc.not_equal(pc.field(f"{col}-lhs"), pc.field(f"{col}-rhs"))),
80+
)
81+
for col in non_key_cols
82+
],
83+
)
6984

7085
return (
7186
source_table

pyiceberg/types.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
Field,
4848
PrivateAttr,
4949
SerializeAsAny,
50+
field_validator,
5051
model_serializer,
5152
model_validator,
5253
)
@@ -310,6 +311,14 @@ class NestedField(IcebergType):
310311
... doc="Just a long"
311312
... ))
312313
'2: bar: required long (Just a long)'
314+
>>> str(NestedField(
315+
... field_id=3,
316+
... name='baz',
317+
... field_type="string",
318+
... required=True,
319+
... doc="A string field"
320+
... ))
321+
'3: baz: required string (A string field)'
313322
"""
314323

315324
field_id: int = Field(alias="id")
@@ -320,11 +329,21 @@ class NestedField(IcebergType):
320329
initial_default: Optional[Any] = Field(alias="initial-default", default=None, repr=False)
321330
write_default: Optional[L] = Field(alias="write-default", default=None, repr=False) # type: ignore
322331

332+
@field_validator("field_type", mode="before")
333+
def convert_field_type(cls, v: Any) -> IcebergType:
334+
"""Convert string values into IcebergType instances."""
335+
if isinstance(v, str):
336+
try:
337+
return IcebergType.handle_primitive_type(v, None)
338+
except ValueError as e:
339+
raise ValueError(f"Unsupported field type: '{v}'") from e
340+
return v
341+
323342
def __init__(
324343
self,
325344
field_id: Optional[int] = None,
326345
name: Optional[str] = None,
327-
field_type: Optional[IcebergType] = None,
346+
field_type: Optional[IcebergType | str] = None,
328347
required: bool = False,
329348
doc: Optional[str] = None,
330349
initial_default: Optional[Any] = None,

pyproject.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ python = "^3.9.2, !=3.9.7"
5353
mmh3 = ">=4.0.0,<6.0.0"
5454
requests = ">=2.20.0,<3.0.0"
5555
click = ">=7.1.1,<9.0.0"
56-
rich = ">=10.11.0,<14.0.0"
56+
rich = ">=10.11.0,<15.0.0"
5757
strictyaml = ">=1.7.0,<2.0.0" # CVE-2020-14343 was fixed in 5.4.
5858
pydantic = ">=2.0,<3.0,!=2.4.0,!=2.4.1" # 2.4.0, 2.4.1 has a critical bug
5959
sortedcontainers = "2.4.0"
@@ -99,20 +99,20 @@ pytest-mock = "3.14.0"
9999
pyspark = "3.5.5"
100100
cython = "3.0.12"
101101
deptry = ">=0.14,<0.24"
102-
datafusion = ">=44,<46"
102+
datafusion = ">=44,<47"
103103
docutils = "!=0.21.post1" # https://github.com/python-poetry/poetry/issues/9248#issuecomment-2026240520
104104

105105
[tool.poetry.group.docs.dependencies]
106106
# for mkdocs
107107
mkdocs = "1.6.1"
108-
griffe = "1.6.3"
108+
griffe = "1.7.1"
109109
jinja2 = "3.1.6"
110-
mkdocstrings = "0.29.0"
110+
mkdocstrings = "0.29.1"
111111
mkdocstrings-python = "1.16.8"
112112
mkdocs-literate-nav = "0.6.2"
113113
mkdocs-autorefs = "1.4.1"
114114
mkdocs-gen-files = "0.5.0"
115-
mkdocs-material = "9.6.9"
115+
mkdocs-material = "9.6.10"
116116
mkdocs-material-extensions = "1.3.1"
117117
mkdocs-section-index = "0.3.9"
118118

tests/expressions/test_expressions.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -591,11 +591,11 @@ def test_negate(lhs: BooleanExpression, rhs: BooleanExpression) -> None:
591591
[
592592
(
593593
And(ExpressionA(), ExpressionB(), ExpressionA()),
594-
And(And(ExpressionA(), ExpressionB()), ExpressionA()),
594+
And(ExpressionA(), And(ExpressionB(), ExpressionA())),
595595
),
596596
(
597597
Or(ExpressionA(), ExpressionB(), ExpressionA()),
598-
Or(Or(ExpressionA(), ExpressionB()), ExpressionA()),
598+
Or(ExpressionA(), Or(ExpressionB(), ExpressionA())),
599599
),
600600
(Not(Not(ExpressionA())), ExpressionA()),
601601
],

tests/expressions/test_visitors.py

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -230,14 +230,14 @@ def test_boolean_expression_visitor() -> None:
230230
"NOT",
231231
"OR",
232232
"EQUALTO",
233-
"OR",
234233
"NOTEQUALTO",
235234
"OR",
235+
"OR",
236236
"EQUALTO",
237237
"NOT",
238-
"AND",
239238
"NOTEQUALTO",
240239
"AND",
240+
"AND",
241241
]
242242

243243

@@ -335,28 +335,28 @@ def test_always_false_or_always_true_expression_binding(table_schema_simple: Sch
335335
),
336336
),
337337
And(
338-
And(
339-
BoundIn(
340-
BoundReference(
341-
field=NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
342-
accessor=Accessor(position=0, inner=None),
343-
),
344-
{literal("bar"), literal("baz")},
338+
BoundIn(
339+
BoundReference(
340+
field=NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
341+
accessor=Accessor(position=0, inner=None),
345342
),
343+
{literal("bar"), literal("baz")},
344+
),
345+
And(
346346
BoundEqualTo[int](
347347
BoundReference(
348348
field=NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True),
349349
accessor=Accessor(position=1, inner=None),
350350
),
351351
literal(1),
352352
),
353-
),
354-
BoundEqualTo(
355-
BoundReference(
356-
field=NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
357-
accessor=Accessor(position=0, inner=None),
353+
BoundEqualTo(
354+
BoundReference(
355+
field=NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
356+
accessor=Accessor(position=0, inner=None),
357+
),
358+
literal("baz"),
358359
),
359-
literal("baz"),
360360
),
361361
),
362362
),
@@ -408,28 +408,28 @@ def test_and_expression_binding(
408408
),
409409
),
410410
Or(
411+
BoundIn(
412+
BoundReference(
413+
field=NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
414+
accessor=Accessor(position=0, inner=None),
415+
),
416+
{literal("bar"), literal("baz")},
417+
),
411418
Or(
412419
BoundIn(
413420
BoundReference(
414421
field=NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
415422
accessor=Accessor(position=0, inner=None),
416423
),
417-
{literal("bar"), literal("baz")},
424+
{literal("bar")},
418425
),
419426
BoundIn(
420427
BoundReference(
421428
field=NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
422429
accessor=Accessor(position=0, inner=None),
423430
),
424-
{literal("bar")},
425-
),
426-
),
427-
BoundIn(
428-
BoundReference(
429-
field=NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
430-
accessor=Accessor(position=0, inner=None),
431+
{literal("baz")},
431432
),
432-
{literal("baz")},
433433
),
434434
),
435435
),

tests/integration/test_writes/test_writes.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
DateType,
5353
DoubleType,
5454
IntegerType,
55+
ListType,
5556
LongType,
5657
NestedField,
5758
StringType,
@@ -1647,3 +1648,38 @@ def test_abort_table_transaction_on_exception(
16471648

16481649
# Validate the transaction is aborted and no partial update is applied
16491650
assert len(tbl.scan().to_pandas()) == table_size # type: ignore
1651+
1652+
1653+
@pytest.mark.integration
1654+
def test_write_optional_list(session_catalog: Catalog) -> None:
1655+
identifier = "default.test_write_optional_list"
1656+
schema = Schema(
1657+
NestedField(field_id=1, name="name", field_type=StringType(), required=False),
1658+
NestedField(
1659+
field_id=3,
1660+
name="my_list",
1661+
field_type=ListType(element_id=45, element=StringType(), element_required=False),
1662+
required=False,
1663+
),
1664+
)
1665+
session_catalog.create_table_if_not_exists(identifier, schema)
1666+
1667+
df_1 = pa.Table.from_pylist(
1668+
[
1669+
{"name": "one", "my_list": ["test"]},
1670+
{"name": "another", "my_list": ["test"]},
1671+
]
1672+
)
1673+
session_catalog.load_table(identifier).append(df_1)
1674+
1675+
assert len(session_catalog.load_table(identifier).scan().to_arrow()) == 2
1676+
1677+
df_2 = pa.Table.from_pylist(
1678+
[
1679+
{"name": "one"},
1680+
{"name": "another"},
1681+
]
1682+
)
1683+
session_catalog.load_table(identifier).append(df_2)
1684+
1685+
assert len(session_catalog.load_table(identifier).scan().to_arrow()) == 4

0 commit comments

Comments
 (0)