Skip to content

Commit 0a96b52

Browse files
authored
Merge branch 'apache:main' into hive-4-support
2 parents f97f978 + 4cac691 commit 0a96b52

27 files changed

+1680
-682
lines changed

dev/docker-compose-integration.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ services:
5353
- CATALOG_WAREHOUSE=s3://warehouse/
5454
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
5555
- CATALOG_S3_ENDPOINT=http://minio:9000
56+
- CATALOG_JDBC_STRICT__MODE=true
5657
minio:
5758
image: minio/minio
5859
container_name: pyiceberg-minio

mkdocs/docs/api.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1523,6 +1523,52 @@ print(ray_dataset.take(2))
15231523
]
15241524
```
15251525

1526+
### Bodo
1527+
1528+
PyIceberg interfaces closely with Bodo Dataframes (see [Bodo Iceberg Quick Start](https://docs.bodo.ai/latest/quick_start/quickstart_local_iceberg/)),
1529+
which provides a drop-in replacement for Pandas that applies query, compiler and HPC optimizations automatically.
1530+
Bodo accelerates and scales Python code from single laptops to large clusters without code rewrites.
1531+
1532+
<!-- prettier-ignore-start -->
1533+
1534+
!!! note "Requirements"
1535+
This requires [`bodo` to be installed](index.md).
1536+
1537+
```python
1538+
pip install pyiceberg['bodo']
1539+
```
1540+
<!-- prettier-ignore-end -->
1541+
1542+
A table can be read easily into a Bodo Dataframe to perform Pandas operations:
1543+
1544+
```python
1545+
df = table.to_bodo() # equivalent to `bodo.pandas.read_iceberg_table(table)`
1546+
df = df[df["trip_distance"] >= 10.0]
1547+
df = df[["VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"]]
1548+
print(df)
1549+
```
1550+
1551+
This creates a lazy query, optimizes it, and runs it on all available cores (print triggers execution):
1552+
1553+
```python
1554+
VendorID tpep_pickup_datetime tpep_dropoff_datetime
1555+
0 2 2023-01-01 00:27:12 2023-01-01 00:49:56
1556+
1 2 2023-01-01 00:09:29 2023-01-01 00:29:23
1557+
2 1 2023-01-01 00:13:30 2023-01-01 00:44:00
1558+
3 2 2023-01-01 00:41:41 2023-01-01 01:19:32
1559+
4 2 2023-01-01 00:22:39 2023-01-01 01:30:45
1560+
... ... ... ...
1561+
245478 2 2023-01-31 22:32:57 2023-01-31 23:01:48
1562+
245479 2 2023-01-31 22:03:26 2023-01-31 22:46:13
1563+
245480 2 2023-01-31 23:25:56 2023-02-01 00:05:42
1564+
245481 2 2023-01-31 23:18:00 2023-01-31 23:46:00
1565+
245482 2 2023-01-31 23:18:00 2023-01-31 23:41:00
1566+
1567+
[245483 rows x 3 columns]
1568+
```
1569+
1570+
Bodo is optimized to take advantage of Iceberg features such as hidden partitioning and various statistics for efficient reads.
1571+
15261572
### Daft
15271573

15281574
PyIceberg interfaces closely with Daft Dataframes (see also: [Daft integration with Iceberg](https://docs.daft.ai/en/stable/io/iceberg/)) which provides a full lazily optimized query engine interface on top of PyIceberg tables.

mkdocs/docs/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ You can mix and match optional dependencies depending on your needs:
5252
| pandas | Installs both PyArrow and Pandas |
5353
| duckdb | Installs both PyArrow and DuckDB |
5454
| ray | Installs PyArrow, Pandas, and Ray |
55+
| bodo | Installs Bodo |
5556
| daft | Installs Daft |
5657
| polars | Installs Polars |
5758
| s3fs | S3FS as a FileIO implementation to interact with the object store |

poetry.lock

Lines changed: 832 additions & 631 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/catalog/hive.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -809,7 +809,7 @@ def update_namespace_properties(
809809
if removals:
810810
for key in removals:
811811
if key in parameters:
812-
parameters[key] = None
812+
parameters.pop(key)
813813
removed.add(key)
814814
if updates:
815815
for key, value in updates.items():

pyiceberg/catalog/rest/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,7 @@ def _create_table(
505505
try:
506506
response.raise_for_status()
507507
except HTTPError as exc:
508-
_handle_non_200_response(exc, {409: TableAlreadyExistsError})
508+
_handle_non_200_response(exc, {409: TableAlreadyExistsError, 404: NoSuchNamespaceError})
509509
return TableResponse.model_validate_json(response.text)
510510

511511
@retry(**_RETRY_ARGS)

pyiceberg/io/pyarrow.py

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2410,9 +2410,12 @@ def data_file_statistics_from_parquet_metadata(
24102410
continue
24112411

24122412
if field_id not in col_aggs:
2413-
col_aggs[field_id] = StatsAggregator(
2414-
stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length
2415-
)
2413+
try:
2414+
col_aggs[field_id] = StatsAggregator(
2415+
stats_col.iceberg_type, statistics.physical_type, stats_col.mode.length
2416+
)
2417+
except ValueError as e:
2418+
raise ValueError(f"{e} for column '{stats_col.column_name}'") from e
24162419

24172420
if isinstance(stats_col.iceberg_type, DecimalType) and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY":
24182421
scale = stats_col.iceberg_type.scale
@@ -2728,9 +2731,11 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T
27282731

27292732
for partition, name in zip(spec.fields, partition_fields):
27302733
source_field = schema.find_field(partition.source_id)
2731-
arrow_table = arrow_table.append_column(
2732-
name, partition.transform.pyarrow_transform(source_field.field_type)(arrow_table[source_field.name])
2733-
)
2734+
full_field_name = schema.find_column_name(partition.source_id)
2735+
if full_field_name is None:
2736+
raise ValueError(f"Could not find column name for field ID: {partition.source_id}")
2737+
field_array = _get_field_from_arrow_table(arrow_table, full_field_name)
2738+
arrow_table = arrow_table.append_column(name, partition.transform.pyarrow_transform(source_field.field_type)(field_array))
27342739

27352740
unique_partition_fields = arrow_table.select(partition_fields).group_by(partition_fields).aggregate([])
27362741

@@ -2765,3 +2770,32 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T
27652770
)
27662771

27672772
return table_partitions
2773+
2774+
2775+
def _get_field_from_arrow_table(arrow_table: pa.Table, field_path: str) -> pa.Array:
2776+
"""Get a field from an Arrow table, supporting both literal field names and nested field paths.
2777+
2778+
This function handles two cases:
2779+
1. Literal field names that may contain dots (e.g., "some.id")
2780+
2. Nested field paths using dot notation (e.g., "bar.baz" for nested access)
2781+
2782+
Args:
2783+
arrow_table: The Arrow table containing the field
2784+
field_path: Field name or dot-separated path
2785+
2786+
Returns:
2787+
The field as a PyArrow Array
2788+
2789+
Raises:
2790+
KeyError: If the field path cannot be resolved
2791+
"""
2792+
# Try exact column name match (handles field names containing literal dots)
2793+
if field_path in arrow_table.column_names:
2794+
return arrow_table[field_path]
2795+
2796+
# If not found as exact name, treat as nested field path
2797+
path_parts = field_path.split(".")
2798+
# Get the struct column from the table (e.g., "bar" from "bar.baz")
2799+
field_array = arrow_table[path_parts[0]]
2800+
# Navigate into the struct using the remaining path parts
2801+
return pc.struct_field(field_array, path_parts[1:])

pyiceberg/table/__init__.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@
137137
from pyiceberg.utils.properties import property_as_bool
138138

139139
if TYPE_CHECKING:
140+
import bodo.pandas as bd
140141
import daft
141142
import pandas as pd
142143
import polars as pl
@@ -1485,6 +1486,16 @@ def to_daft(self) -> daft.DataFrame:
14851486

14861487
return daft.read_iceberg(self)
14871488

1489+
def to_bodo(self) -> bd.DataFrame:
1490+
"""Read a bodo DataFrame lazily from this Iceberg table.
1491+
1492+
Returns:
1493+
bd.DataFrame: Unmaterialized Bodo Dataframe created from the Iceberg table
1494+
"""
1495+
import bodo.pandas as bd
1496+
1497+
return bd.read_iceberg_table(self)
1498+
14881499
def to_polars(self) -> pl.LazyFrame:
14891500
"""Lazily read from this Apache Iceberg table.
14901501
@@ -1691,7 +1702,14 @@ def to_polars(self) -> pl.DataFrame: ...
16911702

16921703
def update(self: S, **overrides: Any) -> S:
16931704
"""Create a copy of this table scan with updated fields."""
1694-
return type(self)(**{**self.__dict__, **overrides})
1705+
from inspect import signature
1706+
1707+
# Extract those attributes that are constructor parameters. We don't use self.__dict__ as the kwargs to the
1708+
# constructors because it may contain additional attributes that are not part of the constructor signature.
1709+
params = signature(type(self).__init__).parameters.keys() - {"self"} # Skip "self" parameter
1710+
kwargs = {param: getattr(self, param) for param in params} # Assume parameters are attributes
1711+
1712+
return type(self)(**{**kwargs, **overrides})
16951713

16961714
def use_ref(self: S, name: str) -> S:
16971715
if self.snapshot_id:

pyiceberg/table/snapshots.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
TOTAL_FILE_SIZE = "total-files-size"
5959
CHANGED_PARTITION_COUNT_PROP = "changed-partition-count"
6060
CHANGED_PARTITION_PREFIX = "partitions."
61+
PARTITION_SUMMARY_PROP = "partition-summaries-included"
6162
OPERATION = "operation"
6263

6364
INITIAL_SEQUENCE_NUMBER = 0
@@ -306,6 +307,8 @@ def build(self) -> Dict[str, str]:
306307
changed_partitions_size = len(self.partition_metrics)
307308
set_when_positive(properties, changed_partitions_size, CHANGED_PARTITION_COUNT_PROP)
308309
if changed_partitions_size <= self.max_changed_partitions_for_summaries:
310+
if changed_partitions_size > 0:
311+
properties[PARTITION_SUMMARY_PROP] = "true"
309312
for partition_path, update_metrics_partition in self.partition_metrics.items():
310313
if (summary := self._partition_summary(update_metrics_partition)) and len(summary) != 0:
311314
properties[CHANGED_PARTITION_PREFIX + partition_path] = summary

pyiceberg/table/statistics.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17-
from typing import Dict, List, Literal, Optional
17+
from typing import Dict, List, Literal, Optional, Union
1818

1919
from pydantic import Field
2020

@@ -48,7 +48,7 @@ class PartitionStatisticsFile(StatisticsCommonFields):
4848

4949

5050
def filter_statistics_by_snapshot_id(
51-
statistics: List[StatisticsFile],
51+
statistics: List[Union[StatisticsFile, PartitionStatisticsFile]],
5252
reject_snapshot_id: int,
53-
) -> List[StatisticsFile]:
53+
) -> List[Union[StatisticsFile, PartitionStatisticsFile]]:
5454
return [stat for stat in statistics if stat.snapshot_id != reject_snapshot_id]

0 commit comments

Comments
 (0)