Skip to content

Commit 5025b4a

Browse files
committed
Moar tests
1 parent 1723819 commit 5025b4a

File tree

2 files changed

+149
-35
lines changed

2 files changed

+149
-35
lines changed

pyiceberg/table/__init__.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
And,
5656
BooleanExpression,
5757
EqualTo,
58+
Not,
5859
Or,
5960
Reference,
6061
)
@@ -240,6 +241,8 @@ class TableProperties:
240241
WRITE_PARTITION_SUMMARY_LIMIT = "write.summary.partition-limit"
241242
WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT = 0
242243

244+
DELETE_MODE = "write.delete.mode"
245+
243246
DEFAULT_NAME_MAPPING = "schema.name-mapping.default"
244247
FORMAT_VERSION = "format-version"
245248
DEFAULT_FORMAT_VERSION = 2
@@ -457,11 +460,18 @@ def overwrite(
457460
update_snapshot.append_data_file(data_file)
458461

459462
def delete(self, delete_filter: BooleanExpression, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
463+
if (mode := self.table_metadata.properties.get(TableProperties.DELETE_MODE)) and mode != 'copy-on-write':
464+
warnings.warn("PyIceberg only supports copy on write")
465+
460466
with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot:
461467
delete_snapshot.delete_by_predicate(delete_filter) # type: ignore
462468

463469
# Check if there are any files that require an actual rewrite of a data file
464-
if delete_snapshot.rewrites_needed: # type: ignore
470+
if delete_snapshot.rewrites_needed is True: # type: ignore
471+
# When we want to filter out certain rows, we want to invert the expression
472+
# delete id = 22 means that we want to look for that value, and then remove
473+
# if from the Parquet file
474+
delete_row_filter = Not(delete_filter)
465475
with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as overwrite_snapshot:
466476
# Potential optimization is where we check if the files actually contain relevant data.
467477
files = self._scan(row_filter=delete_filter).plan_files()
@@ -480,7 +490,7 @@ def delete(self, delete_filter: BooleanExpression, snapshot_properties: Dict[str
480490
tasks=[original_file],
481491
table_metadata=self._table.metadata,
482492
io=self._table.io,
483-
row_filter=delete_filter,
493+
row_filter=delete_row_filter,
484494
projected_schema=self.table_metadata.schema(),
485495
)
486496
for data_file in _dataframe_to_data_files(
@@ -3100,11 +3110,12 @@ def _existing_manifests(self) -> List[ManifestFile]:
31003110
if snapshot := self._transaction.table_metadata.current_snapshot():
31013111
for manifest_file in snapshot.manifests(io=self._io):
31023112
entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True)
3103-
found_deletes = [_ for entry in entries if entry in self._deleted_data_files]
3113+
found_deleted_data_files = [entry.data_file for entry in entries if entry.data_file in self._deleted_data_files]
31043114

3105-
if len(found_deletes) == 0:
3115+
if len(found_deleted_data_files) == 0:
31063116
existing_files.append(manifest_file)
31073117
else:
3118+
# We have to rewrite the
31083119
output_file_location = _new_manifest_path(
31093120
location=self._transaction.table_metadata.location,
31103121
num=next(self._manifest_counter),
@@ -3128,7 +3139,7 @@ def _existing_manifests(self) -> List[ManifestFile]:
31283139
)
31293140
)
31303141
for entry in entries
3131-
if entry not in found_deletes
3142+
if entry.data_file not in found_deleted_data_files
31323143
]
31333144
existing_files.append(writer.to_manifest_file())
31343145
return existing_files

tests/integration/test_deletes.py

Lines changed: 133 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,56 +15,159 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
# pylint:disable=redefined-outer-name
18+
from typing import List
19+
1820
import pytest
19-
from pyspark.sql import DataFrame, SparkSession
21+
from pyspark.sql import SparkSession
2022

2123
from pyiceberg.catalog.rest import RestCatalog
2224
from pyiceberg.expressions import EqualTo
2325

2426

25-
@pytest.fixture
26-
def test_deletes_table(spark: SparkSession) -> DataFrame:
27+
def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None:
28+
for sql in sqls:
29+
spark.sql(sql)
30+
31+
32+
@pytest.mark.parametrize("format_version", [1, 2])
33+
def test_partitioned_table_delete_full_file(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None:
2734
identifier = 'default.table_partitioned_delete'
2835

29-
spark.sql(f"DROP TABLE IF EXISTS {identifier}")
30-
31-
spark.sql(
32-
f"""
33-
CREATE TABLE {identifier} (
34-
number_partitioned int,
35-
number int
36-
)
37-
USING iceberg
38-
PARTITIONED BY (number_partitioned)
39-
"""
40-
)
41-
spark.sql(
42-
f"""
43-
INSERT INTO {identifier} VALUES (10, 20), (10, 30)
44-
"""
45-
)
46-
spark.sql(
47-
f"""
48-
INSERT INTO {identifier} VALUES (11, 20), (11, 30)
49-
"""
36+
run_spark_commands(
37+
spark,
38+
[
39+
f"DROP TABLE IF EXISTS {identifier}",
40+
f"""
41+
CREATE TABLE {identifier} (
42+
number_partitioned int,
43+
number int
44+
)
45+
USING iceberg
46+
PARTITIONED BY (number_partitioned)
47+
TBLPROPERTIES('format-version' = {format_version})
48+
""",
49+
f"""
50+
INSERT INTO {identifier} VALUES (10, 20), (10, 30)
51+
""",
52+
f"""
53+
INSERT INTO {identifier} VALUES (11, 20), (11, 30)
54+
""",
55+
],
5056
)
5157

52-
return spark.table(identifier)
58+
tbl = session_catalog.load_table(identifier)
59+
tbl.delete(EqualTo("number_partitioned", 10))
60+
61+
# No overwrite operation
62+
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ['append', 'append', 'delete']
63+
assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [11, 11], 'number': [20, 30]}
5364

5465

55-
def test_partition_deletes(test_deletes_table: DataFrame, session_catalog: RestCatalog) -> None:
66+
@pytest.mark.parametrize("format_version", [1, 2])
67+
def test_partitioned_table_rewrite(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None:
5668
identifier = 'default.table_partitioned_delete'
5769

70+
run_spark_commands(
71+
spark,
72+
[
73+
f"DROP TABLE IF EXISTS {identifier}",
74+
f"""
75+
CREATE TABLE {identifier} (
76+
number_partitioned int,
77+
number int
78+
)
79+
USING iceberg
80+
PARTITIONED BY (number_partitioned)
81+
TBLPROPERTIES('format-version' = {format_version})
82+
""",
83+
f"""
84+
INSERT INTO {identifier} VALUES (10, 20), (10, 30)
85+
""",
86+
f"""
87+
INSERT INTO {identifier} VALUES (11, 20), (11, 30)
88+
""",
89+
],
90+
)
91+
5892
tbl = session_catalog.load_table(identifier)
59-
tbl.delete(EqualTo("number_partitioned", 10))
93+
tbl.delete(EqualTo("number", 20))
6094

95+
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ['append', 'append', 'delete', 'overwrite']
6196
assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [11, 11], 'number': [20, 30]}
6297

6398

64-
def test_deletes(test_deletes_table: DataFrame, session_catalog: RestCatalog) -> None:
99+
@pytest.mark.parametrize("format_version", [1, 2])
100+
def test_partitioned_table_no_match(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None:
65101
identifier = 'default.table_partitioned_delete'
66102

103+
run_spark_commands(
104+
spark,
105+
[
106+
f"DROP TABLE IF EXISTS {identifier}",
107+
f"""
108+
CREATE TABLE {identifier} (
109+
number_partitioned int,
110+
number int
111+
)
112+
USING iceberg
113+
PARTITIONED BY (number_partitioned)
114+
TBLPROPERTIES('format-version' = {format_version})
115+
""",
116+
f"""
117+
INSERT INTO {identifier} VALUES (10, 20), (10, 30)
118+
""",
119+
],
120+
)
121+
122+
tbl = session_catalog.load_table(identifier)
123+
tbl.delete(EqualTo("number_partitioned", 22)) # Does not affect any data
124+
125+
# Open for discussion, do we want to create a new snapshot?
126+
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ['append', 'delete']
127+
assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [10, 10], 'number': [20, 30]}
128+
129+
130+
def test_partitioned_table_positional_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None:
131+
identifier = 'default.table_partitioned_delete'
132+
133+
run_spark_commands(
134+
spark,
135+
[
136+
f"DROP TABLE IF EXISTS {identifier}",
137+
f"""
138+
CREATE TABLE {identifier} (
139+
number_partitioned int,
140+
number int
141+
)
142+
USING iceberg
143+
PARTITIONED BY (number_partitioned)
144+
TBLPROPERTIES(
145+
'format-version' = 2,
146+
'write.delete.mode'='merge-on-read',
147+
'write.update.mode'='merge-on-read',
148+
'write.merge.mode'='merge-on-read'
149+
)
150+
""",
151+
f"""
152+
INSERT INTO {identifier} VALUES (10, 20), (10, 30), (10, 40)
153+
""",
154+
# Generate a positional delete
155+
f"""
156+
DELETE FROM {identifier} WHERE number = 30
157+
""",
158+
],
159+
)
160+
67161
tbl = session_catalog.load_table(identifier)
68-
tbl.delete(EqualTo("number", 30))
69162

70-
assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [11, 11], 'number': [20, 20]}
163+
# Assert that there is just a single Parquet file
164+
assert len(list(tbl.scan().plan_files())) == 1
165+
166+
# Will rewrite a data file with a positional delete
167+
tbl.delete(EqualTo("number", 40))
168+
169+
# Yet another wrong status by Spark
170+
# One positional delete has been added, but an OVERWRITE status is set
171+
# Related issue https://github.com/apache/iceberg/issues/9995
172+
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ['append', 'overwrite', 'delete']
173+
assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [10], 'number': [20]}

0 commit comments

Comments
 (0)