Skip to content

Commit f6084a6

Browse files
committed
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-ability-to-delete-full-data-files
2 parents 8d45920 + 5ef9f3d commit f6084a6

File tree

9 files changed

+212
-67
lines changed

9 files changed

+212
-67
lines changed

pyiceberg/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
__version__ = "0.6.0"
18+
__version__ = "0.6.1"

pyiceberg/io/pyarrow.py

Lines changed: 52 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1761,54 +1761,67 @@ def data_file_statistics_from_parquet_metadata(
17611761

17621762

17631763
def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
1764-
task = next(tasks)
1765-
1766-
try:
1767-
_ = next(tasks)
1768-
# If there are more tasks, raise an exception
1769-
raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208")
1770-
except StopIteration:
1771-
pass
1772-
1773-
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
1774-
1775-
file_path = f'{table_metadata.location}/data/{task.generate_data_file_filename("parquet")}'
17761764
schema = table_metadata.schema()
17771765
arrow_file_schema = schema.as_arrow()
1766+
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
17781767

1779-
fo = io.new_output(file_path)
17801768
row_group_size = PropertyUtil.property_as_int(
17811769
properties=table_metadata.properties,
17821770
property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
17831771
default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
17841772
)
1785-
with fo.create(overwrite=True) as fos:
1786-
with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer:
1787-
writer.write_table(task.df, row_group_size=row_group_size)
1788-
1789-
statistics = data_file_statistics_from_parquet_metadata(
1790-
parquet_metadata=writer.writer.metadata,
1791-
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
1792-
parquet_column_mapping=parquet_path_to_id_mapping(schema),
1793-
)
1794-
data_file = DataFile(
1795-
content=DataFileContent.DATA,
1796-
file_path=file_path,
1797-
file_format=FileFormat.PARQUET,
1798-
partition=Record(),
1799-
file_size_in_bytes=len(fo),
1800-
# After this has been fixed:
1801-
# https://github.com/apache/iceberg-python/issues/271
1802-
# sort_order_id=task.sort_order_id,
1803-
sort_order_id=None,
1804-
# Just copy these from the table for now
1805-
spec_id=table_metadata.default_spec_id,
1806-
equality_ids=None,
1807-
key_metadata=None,
1808-
**statistics.to_serialized_dict(),
1809-
)
18101773

1811-
return iter([data_file])
1774+
def write_parquet(task: WriteTask) -> DataFile:
1775+
file_path = f'{table_metadata.location}/data/{task.generate_data_file_filename("parquet")}'
1776+
fo = io.new_output(file_path)
1777+
with fo.create(overwrite=True) as fos:
1778+
with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer:
1779+
writer.write(pa.Table.from_batches(task.record_batches), row_group_size=row_group_size)
1780+
1781+
statistics = data_file_statistics_from_parquet_metadata(
1782+
parquet_metadata=writer.writer.metadata,
1783+
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
1784+
parquet_column_mapping=parquet_path_to_id_mapping(schema),
1785+
)
1786+
data_file = DataFile(
1787+
content=DataFileContent.DATA,
1788+
file_path=file_path,
1789+
file_format=FileFormat.PARQUET,
1790+
partition=Record(),
1791+
file_size_in_bytes=len(fo),
1792+
# After this has been fixed:
1793+
# https://github.com/apache/iceberg-python/issues/271
1794+
# sort_order_id=task.sort_order_id,
1795+
sort_order_id=None,
1796+
# Just copy these from the table for now
1797+
spec_id=table_metadata.default_spec_id,
1798+
equality_ids=None,
1799+
key_metadata=None,
1800+
**statistics.to_serialized_dict(),
1801+
)
1802+
1803+
return data_file
1804+
1805+
executor = ExecutorFactory.get_or_create()
1806+
data_files = executor.map(write_parquet, tasks)
1807+
1808+
return iter(data_files)
1809+
1810+
1811+
def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[pa.RecordBatch]]:
1812+
from pyiceberg.utils.bin_packing import PackingIterator
1813+
1814+
avg_row_size_bytes = tbl.nbytes / tbl.num_rows
1815+
target_rows_per_file = target_file_size // avg_row_size_bytes
1816+
batches = tbl.to_batches(max_chunksize=target_rows_per_file)
1817+
bin_packed_record_batches = PackingIterator(
1818+
items=batches,
1819+
target_weight=target_file_size,
1820+
lookback=len(batches), # ignore lookback
1821+
weight_func=lambda x: x.nbytes,
1822+
largest_bin_first=False,
1823+
)
1824+
return bin_packed_record_batches
18121825

18131826

18141827
def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]:

pyiceberg/table/__init__.py

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
from typing_extensions import Annotated
4848

4949
import pyiceberg.expressions.parser as parser
50-
import pyiceberg.expressions.visitors as visitors
5150
from pyiceberg.exceptions import CommitFailedException, ResolveError, ValidationError
5251
from pyiceberg.expressions import (
5352
AlwaysFalse,
@@ -58,6 +57,12 @@
5857
Or,
5958
Reference,
6059
)
60+
from pyiceberg.expressions.visitors import (
61+
_InclusiveMetricsEvaluator,
62+
expression_evaluator,
63+
inclusive_projection,
64+
manifest_evaluator,
65+
)
6166
from pyiceberg.io import FileIO, load_file_io
6267
from pyiceberg.manifest import (
6368
POSITIONAL_DELETE_SCHEMA,
@@ -217,6 +222,9 @@ class TableProperties:
217222

218223
PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column"
219224

225+
WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes"
226+
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = 512 * 1024 * 1024 # 512 MB
227+
220228
DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default"
221229
DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)"
222230

@@ -1130,8 +1138,9 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
11301138

11311139
_check_schema_compatible(self.schema(), other_schema=df.schema)
11321140
# cast if the two schemas are compatible but not equal
1133-
if self.schema().as_arrow() != df.schema:
1134-
df = df.cast(self.schema().as_arrow())
1141+
table_arrow_schema = self.schema().as_arrow()
1142+
if table_arrow_schema != df.schema:
1143+
df = df.cast(table_arrow_schema)
11351144

11361145
with self.transaction() as txn:
11371146
with txn.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
@@ -1171,8 +1180,9 @@ def overwrite(
11711180

11721181
_check_schema_compatible(self.schema(), other_schema=df.schema)
11731182
# cast if the two schemas are compatible but not equal
1174-
if self.schema().as_arrow() != df.schema:
1175-
df = df.cast(self.schema().as_arrow())
1183+
table_arrow_schema = self.schema().as_arrow()
1184+
if table_arrow_schema != df.schema:
1185+
df = df.cast(table_arrow_schema)
11761186

11771187
with self.transaction() as txn:
11781188
with txn.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as update_snapshot:
@@ -1442,9 +1452,7 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent
14421452
relevant_entries = positional_delete_entries[positional_delete_entries.bisect_right(data_entry) :]
14431453

14441454
if len(relevant_entries) > 0:
1445-
evaluator = visitors._InclusiveMetricsEvaluator(
1446-
POSITIONAL_DELETE_SCHEMA, EqualTo("file_path", data_entry.data_file.file_path)
1447-
)
1455+
evaluator = _InclusiveMetricsEvaluator(POSITIONAL_DELETE_SCHEMA, EqualTo("file_path", data_entry.data_file.file_path))
14481456
return {
14491457
positional_delete_entry.data_file
14501458
for positional_delete_entry in relevant_entries
@@ -1468,7 +1476,7 @@ def __init__(
14681476
super().__init__(table, row_filter, selected_fields, case_sensitive, snapshot_id, options, limit)
14691477

14701478
def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
1471-
project = visitors.inclusive_projection(self.table.schema(), self.table.specs()[spec_id])
1479+
project = inclusive_projection(self.table.schema(), self.table.specs()[spec_id])
14721480
return project(self.row_filter)
14731481

14741482
@cached_property
@@ -1477,7 +1485,7 @@ def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
14771485

14781486
def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
14791487
spec = self.table.specs()[spec_id]
1480-
return visitors.manifest_evaluator(spec, self.table.schema(), self.partition_filters[spec_id], self.case_sensitive)
1488+
return manifest_evaluator(spec, self.table.schema(), self.partition_filters[spec_id], self.case_sensitive)
14811489

14821490
def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
14831491
spec = self.table.specs()[spec_id]
@@ -1488,9 +1496,7 @@ def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]
14881496
# The lambda created here is run in multiple threads.
14891497
# So we avoid creating _EvaluatorExpression methods bound to a single
14901498
# shared instance across multiple threads.
1491-
return lambda data_file: visitors.expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(
1492-
data_file.partition
1493-
)
1499+
return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)
14941500

14951501
def _check_sequence_number(self, min_data_sequence_number: int, manifest: ManifestFile) -> bool:
14961502
"""Ensure that no manifests are loaded that contain deletes that are older than the data.
@@ -1535,7 +1541,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
15351541
# this filter depends on the partition spec used to write the manifest file
15361542

15371543
partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)
1538-
metrics_evaluator = visitors._InclusiveMetricsEvaluator(
1544+
metrics_evaluator = _InclusiveMetricsEvaluator(
15391545
self.table.schema(), self.row_filter, self.case_sensitive, self.options.get("include_empty_files") == "true"
15401546
).eval
15411547

@@ -2488,7 +2494,7 @@ def _add_and_move_fields(
24882494
class WriteTask:
24892495
write_uuid: uuid.UUID
24902496
task_id: int
2491-
df: pa.Table
2497+
record_batches: List[pa.RecordBatch]
24922498
sort_order_id: Optional[int] = None
24932499

24942500
# Later to be extended with partition information
@@ -2523,17 +2529,27 @@ def _dataframe_to_data_files(
25232529
Returns:
25242530
An iterable that supplies datafiles that represent the table.
25252531
"""
2526-
from pyiceberg.io.pyarrow import write_file
2532+
from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file
25272533

25282534
if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 0]) > 0:
25292535
raise ValueError("Cannot write to partitioned tables")
25302536

25312537
counter = itertools.count(0)
25322538
write_uuid = write_uuid or uuid.uuid4()
25332539

2540+
target_file_size = PropertyUtil.property_as_int(
2541+
properties=table_metadata.properties,
2542+
property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
2543+
default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
2544+
)
2545+
25342546
# This is an iter, so we don't have to materialize everything every time
25352547
# This will be more relevant when we start doing partitioned writes
2536-
yield from write_file(io=io, table_metadata=table_metadata, tasks=iter([WriteTask(write_uuid, next(counter), df)]))
2548+
yield from write_file(
2549+
io=io,
2550+
table_metadata=table_metadata,
2551+
tasks=iter([WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df, target_file_size)]), # type: ignore
2552+
)
25372553

25382554

25392555
def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]:

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# under the License.
1717
[tool.poetry]
1818
name = "pyiceberg"
19-
version = "0.6.0"
19+
version = "0.6.1"
2020
readme = "README.md"
2121
homepage = "https://py.iceberg.apache.org/"
2222
repository = "https://github.com/apache/iceberg-python"

tests/catalog/test_sql.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ def test_create_table_with_pyarrow_schema(
197197
'catalog',
198198
[
199199
lazy_fixture('catalog_memory'),
200-
# lazy_fixture('catalog_sqlite'),
200+
lazy_fixture('catalog_sqlite'),
201201
],
202202
)
203203
def test_write_pyarrow_schema(catalog: SqlCatalog, random_identifier: Identifier) -> None:
@@ -220,9 +220,6 @@ def test_write_pyarrow_schema(catalog: SqlCatalog, random_identifier: Identifier
220220
database_name, _table_name = random_identifier
221221
catalog.create_namespace(database_name)
222222
table = catalog.create_table(random_identifier, pyarrow_table.schema)
223-
print(pyarrow_table.schema)
224-
print(table.schema().as_struct())
225-
print()
226223
table.overwrite(pyarrow_table)
227224

228225

tests/conftest.py

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import socket
3131
import string
3232
import uuid
33-
from datetime import datetime
33+
from datetime import date, datetime
3434
from pathlib import Path
3535
from random import choice
3636
from tempfile import TemporaryDirectory
@@ -1988,3 +1988,60 @@ def spark() -> "SparkSession":
19881988
)
19891989

19901990
return spark
1991+
1992+
1993+
TEST_DATA_WITH_NULL = {
1994+
'bool': [False, None, True],
1995+
'string': ['a', None, 'z'],
1996+
# Go over the 16 bytes to kick in truncation
1997+
'string_long': ['a' * 22, None, 'z' * 22],
1998+
'int': [1, None, 9],
1999+
'long': [1, None, 9],
2000+
'float': [0.0, None, 0.9],
2001+
'double': [0.0, None, 0.9],
2002+
'timestamp': [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)],
2003+
'timestamptz': [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)],
2004+
'date': [date(2023, 1, 1), None, date(2023, 3, 1)],
2005+
# Not supported by Spark
2006+
# 'time': [time(1, 22, 0), None, time(19, 25, 0)],
2007+
# Not natively supported by Arrow
2008+
# 'uuid': [uuid.UUID('00000000-0000-0000-0000-000000000000').bytes, None, uuid.UUID('11111111-1111-1111-1111-111111111111').bytes],
2009+
'binary': [b'\01', None, b'\22'],
2010+
'fixed': [
2011+
uuid.UUID('00000000-0000-0000-0000-000000000000').bytes,
2012+
None,
2013+
uuid.UUID('11111111-1111-1111-1111-111111111111').bytes,
2014+
],
2015+
}
2016+
2017+
2018+
@pytest.fixture(scope="session")
2019+
def pa_schema() -> "pa.Schema":
2020+
import pyarrow as pa
2021+
2022+
return pa.schema([
2023+
("bool", pa.bool_()),
2024+
("string", pa.string()),
2025+
("string_long", pa.string()),
2026+
("int", pa.int32()),
2027+
("long", pa.int64()),
2028+
("float", pa.float32()),
2029+
("double", pa.float64()),
2030+
("timestamp", pa.timestamp(unit="us")),
2031+
("timestamptz", pa.timestamp(unit="us", tz="UTC")),
2032+
("date", pa.date32()),
2033+
# Not supported by Spark
2034+
# ("time", pa.time64("us")),
2035+
# Not natively supported by Arrow
2036+
# ("uuid", pa.fixed(16)),
2037+
("binary", pa.large_binary()),
2038+
("fixed", pa.binary(16)),
2039+
])
2040+
2041+
2042+
@pytest.fixture(scope="session")
2043+
def arrow_table_with_null(pa_schema: "pa.Schema") -> "pa.Table":
2044+
import pyarrow as pa
2045+
2046+
"""PyArrow table with all kinds of columns"""
2047+
return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema)

tests/integration/test_reads.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,6 @@ def test_ray_nan_rewritten(catalog: Catalog) -> None:
274274
def test_ray_not_nan_count(catalog: Catalog) -> None:
275275
table_test_null_nan_rewritten = catalog.load_table("default.test_null_nan_rewritten")
276276
ray_dataset = table_test_null_nan_rewritten.scan(row_filter=NotNaN("col_numeric"), selected_fields=("idx",)).to_ray()
277-
print(ray_dataset.take())
278277
assert ray_dataset.count() == 2
279278

280279

0 commit comments

Comments
 (0)