Skip to content

Commit 53a0b73

Browse files
authored
prevent adding duplicate files (#1036)
* prevent add_files from adding a file that's already referenced by the iceberg table * fix method that searches files that are already referenced + docs * move function to locate duplicate files into add_files * add check_duplicate_files flag to add_files api to make the behaviour according to java api * add check_duplicate_files flag to table level api and add tests * add check_duplicate_files flag to table level api and add tests * fix tests to check new new added flag check_duplicate_files and fix checks * fix linting
1 parent 0b487f2 commit 53a0b73

File tree

2 files changed

+120
-4
lines changed

2 files changed

+120
-4
lines changed

pyiceberg/table/__init__.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -621,7 +621,9 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti
621621
if not delete_snapshot.files_affected and not delete_snapshot.rewrites_needed:
622622
warnings.warn("Delete operation did not match any records")
623623

624-
def add_files(self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
624+
def add_files(
625+
self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
626+
) -> None:
625627
"""
626628
Shorthand API for adding files as data files to the table transaction.
627629
@@ -630,7 +632,21 @@ def add_files(self, file_paths: List[str], snapshot_properties: Dict[str, str] =
630632
631633
Raises:
632634
FileNotFoundError: If the file does not exist.
635+
ValueError: Raises a ValueError given file_paths contains duplicate files
636+
ValueError: Raises a ValueError given file_paths already referenced by table
633637
"""
638+
if len(file_paths) != len(set(file_paths)):
639+
raise ValueError("File paths must be unique")
640+
641+
if check_duplicate_files:
642+
import pyarrow.compute as pc
643+
644+
expr = pc.field("file_path").isin(file_paths)
645+
referenced_files = [file["file_path"] for file in self._table.inspect.files().filter(expr).to_pylist()]
646+
647+
if referenced_files:
648+
raise ValueError(f"Cannot add files that are already referenced by table, files: {', '.join(referenced_files)}")
649+
634650
if self.table_metadata.name_mapping() is None:
635651
self.set_properties(**{
636652
TableProperties.DEFAULT_NAME_MAPPING: self.table_metadata.schema().name_mapping.model_dump_json()
@@ -1632,7 +1648,9 @@ def delete(
16321648
with self.transaction() as tx:
16331649
tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties)
16341650

1635-
def add_files(self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
1651+
def add_files(
1652+
self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
1653+
) -> None:
16361654
"""
16371655
Shorthand API for adding files as data files to the table.
16381656
@@ -1643,7 +1661,9 @@ def add_files(self, file_paths: List[str], snapshot_properties: Dict[str, str] =
16431661
FileNotFoundError: If the file does not exist.
16441662
"""
16451663
with self.transaction() as tx:
1646-
tx.add_files(file_paths=file_paths, snapshot_properties=snapshot_properties)
1664+
tx.add_files(
1665+
file_paths=file_paths, snapshot_properties=snapshot_properties, check_duplicate_files=check_duplicate_files
1666+
)
16471667

16481668
def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:
16491669
return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive)
@@ -2260,7 +2280,8 @@ def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema:
22602280
visit_with_partner(
22612281
Catalog._convert_schema_if_needed(new_schema),
22622282
-1,
2263-
UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive), # type: ignore
2283+
UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive),
2284+
# type: ignore
22642285
PartnerIdByNameAccessor(partner_schema=self._schema, case_sensitive=self._case_sensitive),
22652286
)
22662287
return self

tests/integration/test_add_files.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -732,3 +732,98 @@ def test_add_files_subset_of_schema(spark: SparkSession, session_catalog: Catalo
732732
for column in written_arrow_table.column_names:
733733
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
734734
assert left == right
735+
736+
737+
@pytest.mark.integration
738+
@pytest.mark.parametrize("format_version", [1, 2])
739+
def test_add_files_with_duplicate_files_in_file_paths(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
740+
identifier = f"default.test_table_duplicate_add_files_v{format_version}"
741+
tbl = _create_table(session_catalog, identifier, format_version)
742+
file_path = "s3://warehouse/default/unpartitioned/v{format_version}/test-1.parquet"
743+
file_paths = [file_path, file_path]
744+
745+
# add the parquet files as data files
746+
with pytest.raises(ValueError) as exc_info:
747+
tbl.add_files(file_paths=file_paths)
748+
assert "File paths must be unique" in str(exc_info.value)
749+
750+
751+
@pytest.mark.integration
752+
@pytest.mark.parametrize("format_version", [1, 2])
753+
def test_add_files_that_referenced_by_current_snapshot(
754+
spark: SparkSession, session_catalog: Catalog, format_version: int
755+
) -> None:
756+
identifier = f"default.test_table_add_referenced_file_v{format_version}"
757+
tbl = _create_table(session_catalog, identifier, format_version)
758+
759+
file_paths = [f"s3://warehouse/default/unpartitioned/v{format_version}/test-{i}.parquet" for i in range(5)]
760+
761+
# write parquet files
762+
for file_path in file_paths:
763+
fo = tbl.io.new_output(file_path)
764+
with fo.create(overwrite=True) as fos:
765+
with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
766+
writer.write_table(ARROW_TABLE)
767+
768+
# add the parquet files as data files
769+
tbl.add_files(file_paths=file_paths)
770+
existing_files_in_table = tbl.inspect.files().to_pylist().pop()["file_path"]
771+
772+
with pytest.raises(ValueError) as exc_info:
773+
tbl.add_files(file_paths=[existing_files_in_table])
774+
assert f"Cannot add files that are already referenced by table, files: {existing_files_in_table}" in str(exc_info.value)
775+
776+
777+
@pytest.mark.integration
778+
@pytest.mark.parametrize("format_version", [1, 2])
779+
def test_add_files_that_referenced_by_current_snapshot_with_check_duplicate_files_false(
780+
spark: SparkSession, session_catalog: Catalog, format_version: int
781+
) -> None:
782+
identifier = f"default.test_table_add_referenced_file_v{format_version}"
783+
tbl = _create_table(session_catalog, identifier, format_version)
784+
785+
file_paths = [f"s3://warehouse/default/unpartitioned/v{format_version}/test-{i}.parquet" for i in range(5)]
786+
# write parquet files
787+
for file_path in file_paths:
788+
fo = tbl.io.new_output(file_path)
789+
with fo.create(overwrite=True) as fos:
790+
with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
791+
writer.write_table(ARROW_TABLE)
792+
793+
# add the parquet files as data files
794+
tbl.add_files(file_paths=file_paths)
795+
existing_files_in_table = tbl.inspect.files().to_pylist().pop()["file_path"]
796+
tbl.add_files(file_paths=[existing_files_in_table], check_duplicate_files=False)
797+
rows = spark.sql(
798+
f"""
799+
SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count
800+
FROM {identifier}.all_manifests
801+
"""
802+
).collect()
803+
assert [row.added_data_files_count for row in rows] == [5, 1, 5]
804+
assert [row.existing_data_files_count for row in rows] == [0, 0, 0]
805+
assert [row.deleted_data_files_count for row in rows] == [0, 0, 0]
806+
807+
808+
@pytest.mark.integration
809+
@pytest.mark.parametrize("format_version", [1, 2])
810+
def test_add_files_that_referenced_by_current_snapshot_with_check_duplicate_files_true(
811+
spark: SparkSession, session_catalog: Catalog, format_version: int
812+
) -> None:
813+
identifier = f"default.test_table_add_referenced_file_v{format_version}"
814+
tbl = _create_table(session_catalog, identifier, format_version)
815+
816+
file_paths = [f"s3://warehouse/default/unpartitioned/v{format_version}/test-{i}.parquet" for i in range(5)]
817+
# write parquet files
818+
for file_path in file_paths:
819+
fo = tbl.io.new_output(file_path)
820+
with fo.create(overwrite=True) as fos:
821+
with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
822+
writer.write_table(ARROW_TABLE)
823+
824+
# add the parquet files as data files
825+
tbl.add_files(file_paths=file_paths)
826+
existing_files_in_table = tbl.inspect.files().to_pylist().pop()["file_path"]
827+
with pytest.raises(ValueError) as exc_info:
828+
tbl.add_files(file_paths=[existing_files_in_table], check_duplicate_files=True)
829+
assert f"Cannot add files that are already referenced by table, files: {existing_files_in_table}" in str(exc_info.value)

0 commit comments

Comments
 (0)