Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1006,9 +1006,11 @@ Expert Iceberg users may choose to commit existing parquet files to the Iceberg

<!-- prettier-ignore-start -->

!!! note "Name Mapping"
Because `add_files` uses existing files without writing new parquet files that are aware of the Iceberg's schema, it requires the Iceberg's table to have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) (The Name mapping maps the field names within the parquet files to the Iceberg field IDs). Hence, `add_files` requires that there are no field IDs in the parquet file's metadata, and creates a new Name Mapping based on the table's current schema if the table doesn't already have one.

!!! note "Name Mapping and Field IDs"
`add_files` can work with Parquet files both with and without field IDs in their metadata:
- **Files with field IDs**: When field IDs are present in the Parquet metadata, they must match the corresponding field IDs in the Iceberg table schema. This is common for files generated by tools like Spark or when using or other libraries with explicit field ID metadata.
- **Files without field IDs**: When field IDs are absent, the table must have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) to map field names to Iceberg field IDs. `add_files` will automatically create a Name Mapping based on the table's current schema if one doesn't already exist.
In both cases, a Name Mapping is created if the table doesn't have one, ensuring compatibility with various readers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For parquet files with field ID, i dont think we necessary need the name mapping if its aligned with the table schema field IDs
But we can address this separately

!!! note "Partitions"
`add_files` only requires the client to read the existing parquet files' metadata footer to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like `MonthTransform`, and `TruncateTransform` which preserve the order of the values after the transformation (Any Transform that has the `preserves_order` property set to True is supported). Please note that if the column statistics of the `PartitionField`'s source column are not present in the parquet metadata, the partition value is inferred as `None`.

Expand Down
42 changes: 37 additions & 5 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2610,7 +2610,10 @@ def _check_pyarrow_schema_compatible(
Raises:
ValueError: If the schemas are not compatible.
"""
# Check if the PyArrow schema has explicit field IDs
has_field_ids = visit_pyarrow(provided_schema, _HasIds())
name_mapping = requested_schema.name_mapping

try:
provided_schema = pyarrow_to_schema(
provided_schema,
Expand All @@ -2624,8 +2627,41 @@ def _check_pyarrow_schema_compatible(
)
additional_names = set(provided_schema._name_to_id.keys()) - set(requested_schema._name_to_id.keys())
raise ValueError(
f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)."
f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. "
"Update the schema first (hint, use union_by_name)."
) from e

# If the file has explicit field IDs, validate they match the table schema exactly
if has_field_ids:
# Build mappings for both schemas (including nested fields)
requested_id_to_name = requested_schema._lazy_id_to_name
provided_id_to_name = provided_schema._lazy_id_to_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @jeroko Thanks for working on this, and adding this check.

However, I don't think we really care about the names; it is not a problem when they differ. However, if you add a file with a different schema, we can brick the table because of issues in the types. Should we check if the file contains the expected type for each of the IDs instead?

Copy link
Author

@jeroko jeroko Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko Right, we should not care about the names if the IDs are provided, and the mapping between the IDs and the types was already checked in the call to _check_schema_compatible at the end of this function. In that case I didn't really need to add any extra check, just a new test to verify that files with matching field IDs and incompatible types fail.


# Also build reverse mapping: path -> field_id for the table
requested_name_to_id = {path: field_id for field_id, path in requested_id_to_name.items()}

# Check that all field paths in the file have matching field IDs in the table
mismatched_fields = []
for field_id, provided_path in provided_id_to_name.items():
# Check if this path exists in the table schema
expected_field_id = requested_name_to_id.get(provided_path)
if expected_field_id is None:
# The file has a field path that doesn't exist in the table at all
# This will be caught by _check_schema_compatible later, so skip it here
continue
elif expected_field_id != field_id:
# Same path, different field ID - this is the critical error
mismatched_fields.append(
f"'{provided_path}': table expects field_id={expected_field_id}, file has field_id={field_id}"
)

if mismatched_fields:
raise ValueError(
"Field IDs in Parquet file do not match table schema. When field IDs are explicitly set in the "
"Parquet metadata, they must match the Iceberg table schema.\nMismatched fields:\n"
+ "\n".join(f" - {field}" for field in mismatched_fields)
)

_check_schema_compatible(requested_schema, provided_schema)


Expand All @@ -2641,10 +2677,6 @@ def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_pa
parquet_metadata = pq.read_metadata(input_stream)

arrow_schema = parquet_metadata.schema.to_arrow_schema()
if visit_pyarrow(arrow_schema, _HasIds()):
raise NotImplementedError(
f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids"
)

schema = table_metadata.schema()
_check_pyarrow_schema_compatible(schema, arrow_schema, format_version=table_metadata.format_version)
Expand Down
140 changes: 134 additions & 6 deletions tests/integration/test_add_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
LongType,
NestedField,
StringType,
StructType,
TimestampType,
TimestamptzType,
)
Expand Down Expand Up @@ -216,23 +217,150 @@ def test_add_files_to_unpartitioned_table_raises_file_not_found(


@pytest.mark.integration
def test_add_files_to_unpartitioned_table_raises_has_field_ids(
def test_add_files_to_unpartitioned_table_with_field_ids(
spark: SparkSession, session_catalog: Catalog, format_version: int
) -> None:
identifier = f"default.unpartitioned_raises_field_ids_v{format_version}"
identifier = f"default.unpartitioned_with_field_ids_v{format_version}"
tbl = _create_table(session_catalog, identifier, format_version)

file_paths = [f"s3://warehouse/default/unpartitioned_raises_field_ids/v{format_version}/test-{i}.parquet" for i in range(5)]
# write parquet files
file_paths = [f"s3://warehouse/default/unpartitioned_with_field_ids/v{format_version}/test-{i}.parquet" for i in range(5)]
# write parquet files with field IDs matching the table schema
for file_path in file_paths:
fo = tbl.io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=ARROW_SCHEMA_WITH_IDS) as writer:
writer.write_table(ARROW_TABLE_WITH_IDS)

# add the parquet files as data files
with pytest.raises(NotImplementedError):
tbl.add_files(file_paths=file_paths)
tbl.add_files(file_paths=file_paths)

# NameMapping should still be set even though files have field IDs
assert tbl.name_mapping() is not None

# Verify files were added successfully
rows = spark.sql(
f"""
SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count
FROM {identifier}.all_manifests
"""
).collect()

assert [row.added_data_files_count for row in rows] == [5]
assert [row.existing_data_files_count for row in rows] == [0]
assert [row.deleted_data_files_count for row in rows] == [0]

# Verify data can be read back correctly
df = spark.table(identifier).toPandas()
assert len(df) == 5
assert all(df["foo"] == True) # noqa: E712
assert all(df["bar"] == "bar_string")
assert all(df["baz"] == 123)
assert all(df["qux"] == date(2024, 3, 7))


@pytest.mark.integration
def test_add_files_with_mismatched_field_ids(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
identifier = f"default.unpartitioned_mismatched_field_ids_v{format_version}"
tbl = _create_table(session_catalog, identifier, format_version)

# Create schema with field IDs that don't match the table schema
# Table has: 1=foo, 2=bar, 3=baz, 4=qux (assigned by catalog)
# This file has: 1=foo, 2=bar, 5=baz, 6=qux (wrong IDs for baz and qux)
mismatched_schema = pa.schema(
[
pa.field("foo", pa.bool_(), nullable=False, metadata={"PARQUET:field_id": "1"}),
pa.field("bar", pa.string(), nullable=False, metadata={"PARQUET:field_id": "2"}),
pa.field("baz", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "5"}), # Wrong: should be 3
pa.field("qux", pa.date32(), nullable=False, metadata={"PARQUET:field_id": "6"}), # Wrong: should be 4
]
)

file_path = f"s3://warehouse/default/unpartitioned_mismatched_field_ids/v{format_version}/test.parquet"
fo = tbl.io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=mismatched_schema) as writer:
writer.write_table(ARROW_TABLE_WITH_IDS)

# Adding files with mismatched field IDs should fail
with pytest.raises(ValueError, match="Field IDs in Parquet file do not match table schema"):
tbl.add_files(file_paths=[file_path])


@pytest.mark.integration
def test_add_files_with_mismatched_nested_field_ids(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
"""Test that files with mismatched nested (struct) field IDs are rejected."""
identifier = f"default.nested_mismatched_field_ids_v{format_version}"

# Create a table with a nested struct field
try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

nested_schema = Schema(
NestedField(1, "id", IntegerType(), required=False),
NestedField(
2,
"user",
StructType(
NestedField(3, "name", StringType(), required=False),
NestedField(4, "age", IntegerType(), required=False),
),
required=False,
),
schema_id=0,
)

tbl = session_catalog.create_table(
identifier=identifier,
schema=nested_schema,
properties={"format-version": str(format_version)},
)

# Create PyArrow schema with MISMATCHED nested field IDs
# The table expects: id=1, user=2, user.name=3, user.age=4
# This file has: id=1, user=2, user.name=99, user.age=100 (wrong nested IDs)
pa_schema_mismatched = pa.schema(
[
pa.field("id", pa.int32(), nullable=True, metadata={b"PARQUET:field_id": b"1"}),
pa.field(
"user",
pa.struct(
[
pa.field("name", pa.string(), nullable=True, metadata={b"PARQUET:field_id": b"99"}), # Wrong!
pa.field("age", pa.int32(), nullable=True, metadata={b"PARQUET:field_id": b"100"}), # Wrong!
]
),
nullable=True,
metadata={b"PARQUET:field_id": b"2"},
),
]
)

pa_table = pa.table(
{
"id": pa.array([1, 2, 3], type=pa.int32()),
"user": pa.array(
[
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25},
{"name": "Charlie", "age": 35},
],
type=pa_schema_mismatched.field("user").type,
),
},
schema=pa_schema_mismatched,
)

file_path = f"s3://warehouse/default/nested_mismatched_field_ids/v{format_version}/test.parquet"
fo = tbl.io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=pa_schema_mismatched) as writer:
writer.write_table(pa_table)

# Adding files with mismatched nested field IDs should fail
with pytest.raises(ValueError, match="Field IDs in Parquet file do not match table schema"):
tbl.add_files(file_paths=[file_path])


@pytest.mark.integration
Expand Down