Skip to content

Commit 8d45920

Browse files
committed
Add option to delete datafiles
This is done through the Iceberg metadata, resulting in efficient deletes if the data is partitioned correctly
1 parent 4c1cfdc commit 8d45920

File tree

4 files changed

+200
-5
lines changed

4 files changed

+200
-5
lines changed

pyiceberg/table/__init__.py

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,12 @@
5050
import pyiceberg.expressions.visitors as visitors
5151
from pyiceberg.exceptions import CommitFailedException, ResolveError, ValidationError
5252
from pyiceberg.expressions import (
53+
AlwaysFalse,
5354
AlwaysTrue,
5455
And,
5556
BooleanExpression,
5657
EqualTo,
58+
Or,
5759
Reference,
5860
)
5961
from pyiceberg.io import FileIO, load_file_io
@@ -2710,6 +2712,114 @@ def _commit(self) -> UpdatesAndRequirements:
27102712
)
27112713

27122714

2715+
class DeleteFiles(_MergingSnapshotProducer):
2716+
_predicate: BooleanExpression
2717+
2718+
def __init__(
2719+
self,
2720+
operation: Operation,
2721+
transaction: Transaction,
2722+
io: FileIO,
2723+
commit_uuid: Optional[uuid.UUID] = None,
2724+
snapshot_properties: Dict[str, str] = EMPTY_DICT,
2725+
):
2726+
super().__init__(operation, transaction, io, commit_uuid, snapshot_properties)
2727+
self._predicate = AlwaysFalse()
2728+
2729+
def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
2730+
schema = self._transaction.table_metadata.schema()
2731+
spec = self._transaction.table_metadata.specs()[spec_id]
2732+
project = visitors.inclusive_projection(schema, spec)
2733+
return project(self._predicate)
2734+
2735+
@cached_property
2736+
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
2737+
return KeyDefaultDict(self._build_partition_projection)
2738+
2739+
def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
2740+
schema = self._transaction.table_metadata.schema()
2741+
spec = self._transaction.table_metadata.specs()[spec_id]
2742+
return visitors.manifest_evaluator(spec, schema, self.partition_filters[spec_id], case_sensitive=True)
2743+
2744+
def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
2745+
schema = self._transaction.table_metadata.schema()
2746+
spec = self._transaction.table_metadata.specs()[spec_id]
2747+
partition_type = spec.partition_type(schema)
2748+
partition_schema = Schema(*partition_type.fields)
2749+
partition_expr = self.partition_filters[spec_id]
2750+
2751+
return lambda data_file: visitors.expression_evaluator(partition_schema, partition_expr, case_sensitive=True)(
2752+
data_file.partition
2753+
)
2754+
2755+
def delete(self, predicate: BooleanExpression) -> None:
2756+
self._predicate = Or(self._predicate, predicate)
2757+
2758+
@cached_property
2759+
def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry]]:
2760+
schema = self._transaction.table_metadata.schema()
2761+
2762+
def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ManifestEntry:
2763+
return ManifestEntry(
2764+
status=status,
2765+
snapshot_id=entry.snapshot_id,
2766+
data_sequence_number=entry.data_sequence_number,
2767+
file_sequence_number=entry.file_sequence_number,
2768+
data_file=entry.data_file,
2769+
)
2770+
2771+
manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
2772+
strict_metrics_evaluator = visitors._StrictMetricsEvaluator(schema, self._predicate, case_sensitive=True).eval
2773+
inclusive_metrics_evaluator = visitors._InclusiveMetricsEvaluator(schema, self._predicate, case_sensitive=True).eval
2774+
2775+
existing_manifests = []
2776+
total_deleted_entries = []
2777+
if snapshot := self._transaction.table_metadata.current_snapshot():
2778+
for num, manifest_file in enumerate(snapshot.manifests(io=self._io)):
2779+
if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
2780+
# If the manifest isn't relevant, we can just keep it in the manifest-list
2781+
existing_manifests.append(manifest_file)
2782+
else:
2783+
# It is relevant, let's check out the content
2784+
deleted_entries = []
2785+
existing_entries = []
2786+
for entry in manifest_file.fetch_manifest_entry(io=self._io):
2787+
if strict_metrics_evaluator(entry.data_file) == visitors.ROWS_MUST_MATCH:
2788+
deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED))
2789+
elif inclusive_metrics_evaluator(entry.data_file) == visitors.ROWS_CANNOT_MATCH:
2790+
existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING))
2791+
else:
2792+
raise ValueError("Deletes do not support rewrites of data files")
2793+
2794+
if len(deleted_entries) > 0:
2795+
total_deleted_entries += deleted_entries
2796+
2797+
# Rewrite the manifest
2798+
if len(existing_entries) > 0:
2799+
output_file_location = _new_manifest_path(
2800+
location=self._transaction.table_metadata.location, num=num, commit_uuid=self.commit_uuid
2801+
)
2802+
with write_manifest(
2803+
format_version=self._transaction.table_metadata.format_version,
2804+
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
2805+
schema=self._transaction.table_metadata.schema(),
2806+
output_file=self._io.new_output(output_file_location),
2807+
snapshot_id=self._snapshot_id,
2808+
) as writer:
2809+
for existing_entry in existing_entries:
2810+
writer.add_entry(existing_entry)
2811+
else:
2812+
existing_manifests.append(manifest_file)
2813+
2814+
return existing_manifests, total_deleted_entries
2815+
2816+
def _existing_manifests(self) -> List[ManifestFile]:
2817+
return self._compute_deletes[0]
2818+
2819+
def _deleted_entries(self) -> List[ManifestEntry]:
2820+
return self._compute_deletes[1]
2821+
2822+
27132823
class FastAppendFiles(_MergingSnapshotProducer):
27142824
def _existing_manifests(self) -> List[ManifestFile]:
27152825
"""To determine if there are any existing manifest files.
@@ -2787,7 +2897,7 @@ class UpdateSnapshot:
27872897
_io: FileIO
27882898
_snapshot_properties: Dict[str, str]
27892899

2790-
def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str]) -> None:
2900+
def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
27912901
self._transaction = transaction
27922902
self._io = io
27932903
self._snapshot_properties = snapshot_properties
@@ -2807,6 +2917,14 @@ def overwrite(self) -> OverwriteFiles:
28072917
snapshot_properties=self._snapshot_properties,
28082918
)
28092919

2920+
def delete(self) -> DeleteFiles:
2921+
return DeleteFiles(
2922+
operation=Operation.DELETE,
2923+
transaction=self._transaction,
2924+
io=self._io,
2925+
snapshot_properties=self._snapshot_properties,
2926+
)
2927+
28102928

28112929
class UpdateSpec(UpdateTableMetadata["UpdateSpec"]):
28122930
_transaction: Transaction

pyiceberg/table/snapshots.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ def get_prop(prop: str) -> int:
345345
def update_snapshot_summaries(
346346
summary: Summary, previous_summary: Optional[Mapping[str, str]] = None, truncate_full_table: bool = False
347347
) -> Summary:
348-
if summary.operation not in {Operation.APPEND, Operation.OVERWRITE}:
348+
if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE}:
349349
raise ValueError(f"Operation not implemented: {summary.operation}")
350350

351351
if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None:

tests/conftest.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import boto3
4747
import pytest
4848
from moto import mock_aws
49-
from pyspark.sql import SparkSession
5049

5150
from pyiceberg import schema
5251
from pyiceberg.catalog import Catalog, load_catalog
@@ -86,6 +85,7 @@
8685
if TYPE_CHECKING:
8786
import pyarrow as pa
8887
from moto.server import ThreadedMotoServer # type: ignore
88+
from pyspark.sql import SparkSession
8989

9090
from pyiceberg.io.pyarrow import PyArrowFileIO
9191

@@ -1954,9 +1954,10 @@ def session_catalog() -> Catalog:
19541954

19551955

19561956
@pytest.fixture(scope="session")
1957-
def spark() -> SparkSession:
1957+
def spark() -> "SparkSession":
19581958
import importlib.metadata
1959-
import os
1959+
1960+
from pyspark.sql import SparkSession
19601961

19611962
spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2])
19621963
scala_version = "2.12"

tests/integration/test_deletes.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
# pylint:disable=redefined-outer-name
18+
import pytest
19+
from pyspark.sql import DataFrame, SparkSession
20+
21+
from pyiceberg.catalog.rest import RestCatalog
22+
from pyiceberg.expressions import EqualTo
23+
24+
25+
@pytest.fixture
26+
def test_deletes_table(spark: SparkSession) -> DataFrame:
27+
identifier = 'default.table_partitioned_delete'
28+
29+
spark.sql(f"DROP TABLE IF EXISTS {identifier}")
30+
31+
spark.sql(
32+
f"""
33+
CREATE TABLE {identifier} (
34+
number_partitioned int,
35+
number int
36+
)
37+
USING iceberg
38+
PARTITIONED BY (number_partitioned)
39+
"""
40+
)
41+
spark.sql(
42+
f"""
43+
INSERT INTO {identifier} VALUES (10, 20), (10, 30)
44+
"""
45+
)
46+
spark.sql(
47+
f"""
48+
INSERT INTO {identifier} VALUES (11, 20), (11, 30)
49+
"""
50+
)
51+
52+
return spark.table(identifier)
53+
54+
55+
def test_partition_deletes(test_deletes_table: DataFrame, session_catalog: RestCatalog) -> None:
56+
identifier = 'default.table_partitioned_delete'
57+
58+
tbl = session_catalog.load_table(identifier)
59+
60+
with tbl.transaction() as txn:
61+
with txn.update_snapshot().delete() as delete:
62+
delete.delete(EqualTo("number_partitioned", 10))
63+
64+
assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [11, 11], 'number': [20, 30]}
65+
66+
67+
def test_deletes(test_deletes_table: DataFrame, session_catalog: RestCatalog) -> None:
68+
identifier = 'default.table_partitioned_delete'
69+
70+
tbl = session_catalog.load_table(identifier)
71+
72+
with tbl.transaction() as txn:
73+
with txn.update_snapshot().delete() as delete:
74+
delete.delete(EqualTo("number", 30))
75+
76+
assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [11, 11], 'number': [20, 30]}

0 commit comments

Comments
 (0)