Skip to content

Commit 1e6ec0e

Browse files
authored
Use original partition-spec-id when marking a file as deleted (#984)
Co-authored-by: Sung Yun <[email protected]>
1 parent 6a6e0ab commit 1e6ec0e

File tree

2 files changed

+106
-11
lines changed

2 files changed

+106
-11
lines changed

pyiceberg/table/__init__.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3135,16 +3135,22 @@ def _write_delete_manifest() -> List[ManifestFile]:
31353135
# Check if we need to mark the files as deleted
31363136
deleted_entries = self._deleted_entries()
31373137
if len(deleted_entries) > 0:
3138-
with write_manifest(
3139-
format_version=self._transaction.table_metadata.format_version,
3140-
spec=self._transaction.table_metadata.spec(),
3141-
schema=self._transaction.table_metadata.schema(),
3142-
output_file=self.new_manifest_output(),
3143-
snapshot_id=self._snapshot_id,
3144-
) as writer:
3145-
for delete_entry in deleted_entries:
3146-
writer.add_entry(delete_entry)
3147-
return [writer.to_manifest_file()]
3138+
deleted_manifests = []
3139+
partition_groups: Dict[int, List[ManifestEntry]] = defaultdict(list)
3140+
for deleted_entry in deleted_entries:
3141+
partition_groups[deleted_entry.data_file.spec_id].append(deleted_entry)
3142+
for spec_id, entries in partition_groups.items():
3143+
with write_manifest(
3144+
format_version=self._transaction.table_metadata.format_version,
3145+
spec=self._transaction.table_metadata.specs()[spec_id],
3146+
schema=self._transaction.table_metadata.schema(),
3147+
output_file=self.new_manifest_output(),
3148+
snapshot_id=self._snapshot_id,
3149+
) as writer:
3150+
for entry in entries:
3151+
writer.add_entry(entry)
3152+
deleted_manifests.append(writer.to_manifest_file())
3153+
return deleted_manifests
31483154
else:
31493155
return []
31503156

tests/integration/test_deletes.py

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
# pylint:disable=redefined-outer-name
18+
from datetime import datetime
1819
from typing import List
1920

2021
import pyarrow as pa
@@ -25,9 +26,11 @@
2526
from pyiceberg.exceptions import NoSuchTableError
2627
from pyiceberg.expressions import AlwaysTrue, EqualTo
2728
from pyiceberg.manifest import ManifestEntryStatus
29+
from pyiceberg.partitioning import PartitionField, PartitionSpec
2830
from pyiceberg.schema import Schema
2931
from pyiceberg.table.snapshots import Operation, Summary
30-
from pyiceberg.types import FloatType, IntegerType, NestedField
32+
from pyiceberg.transforms import IdentityTransform
33+
from pyiceberg.types import FloatType, IntegerType, LongType, NestedField, TimestampType
3134

3235

3336
def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None:
@@ -556,3 +559,89 @@ def test_delete_overwrite_table_with_nan(session_catalog: RestCatalog) -> None:
556559
assert 2.0 in result
557560
assert 3.0 in result
558561
assert 4.0 in result
562+
563+
564+
@pytest.mark.integration
565+
def test_delete_after_partition_evolution_from_unpartitioned(session_catalog: RestCatalog) -> None:
566+
identifier = "default.test_delete_after_partition_evolution_from_unpartitioned"
567+
568+
arrow_table = pa.Table.from_arrays(
569+
[
570+
pa.array([2, 3, 4, 5, 6]),
571+
],
572+
names=["idx"],
573+
)
574+
575+
try:
576+
session_catalog.drop_table(identifier)
577+
except NoSuchTableError:
578+
pass
579+
580+
tbl = session_catalog.create_table(
581+
identifier,
582+
schema=Schema(
583+
NestedField(1, "idx", LongType()),
584+
),
585+
)
586+
587+
tbl.append(arrow_table)
588+
589+
with tbl.transaction() as tx:
590+
with tx.update_schema() as schema:
591+
schema.rename_column("idx", "id")
592+
with tx.update_spec() as spec:
593+
spec.add_field("id", IdentityTransform())
594+
595+
# Append one more time to create data files with two partition specs
596+
tbl.append(arrow_table.rename_columns(["id"]))
597+
598+
tbl.delete("id == 4")
599+
600+
# Expect 8 records: 10 records - 2
601+
assert len(tbl.scan().to_arrow()) == 8
602+
603+
604+
@pytest.mark.integration
605+
def test_delete_after_partition_evolution_from_partitioned(session_catalog: RestCatalog) -> None:
606+
identifier = "default.test_delete_after_partition_evolution_from_partitioned"
607+
608+
arrow_table = pa.Table.from_arrays(
609+
[
610+
pa.array([2, 3, 4, 5, 6]),
611+
pa.array([
612+
datetime(2021, 5, 19),
613+
datetime(2022, 7, 25),
614+
datetime(2023, 3, 22),
615+
datetime(2024, 7, 17),
616+
datetime(2025, 2, 22),
617+
]),
618+
],
619+
names=["idx", "ts"],
620+
)
621+
622+
try:
623+
session_catalog.drop_table(identifier)
624+
except NoSuchTableError:
625+
pass
626+
627+
tbl = session_catalog.create_table(
628+
identifier,
629+
schema=Schema(NestedField(1, "idx", LongType()), NestedField(2, "ts", TimestampType())),
630+
partition_spec=PartitionSpec(PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="ts")),
631+
)
632+
633+
tbl.append(arrow_table)
634+
635+
with tbl.transaction() as tx:
636+
with tx.update_schema() as schema:
637+
schema.rename_column("idx", "id")
638+
with tx.update_spec() as spec:
639+
spec.add_field("id", IdentityTransform())
640+
641+
# Append one more time to create data files with two partition specs
642+
tbl.append(arrow_table.rename_columns(["id", "ts"]))
643+
644+
tbl.delete("id == 4")
645+
646+
# Expect 8 records: 10 records - 2
647+
assert len(tbl.scan().to_arrow()) == 8

0 commit comments

Comments
 (0)