-
Notifications
You must be signed in to change notification settings - Fork 387
Closed
Description
Apache Iceberg version
0.10.0 (latest release)
Please describe the bug 🐞
PyIceberg raises ValueError: Could not find field with id: X when reading partitioned tables with column projection after schema evolution, when:
- Table is partitioned
- Query uses column projection
- Partition field is NOT included in the selected columns
- Schema has evolved
- An old partition file is missing a field that was requested in the query
My hunch at the root cause:
In pyarrow:_get_column_project_values, we are trying to get the partition schema from the projected schema, but this will fail when the projected schema doesn't contain the partition.
I think this can be fixed if we passed through the full table schema instead of the projected_schema to partition_spec.partition_type(schema). I'd be happy to submit a patch if this is regarded as the correct approach.
Reproduction script:
import tempfile
from datetime import date
from pathlib import Path
import pyarrow as pa
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.transforms import IdentityTransform
from pyiceberg.types import DateType, IntegerType, NestedField, StringType
temp_dir = tempfile.mkdtemp()
warehouse_path = Path(temp_dir) / "warehouse"
warehouse_path.mkdir()
catalog_db = Path(temp_dir) / "catalog.db"
catalog = SqlCatalog(
"test_catalog",
**{
"uri": f"sqlite:///{catalog_db}",
"warehouse": f"file://{warehouse_path}",
}
)
catalog.create_namespace("default")
# Define initial schema
initial_schema = Schema(
NestedField(field_id=1, name="partition_date", field_type=DateType(), required=False),
NestedField(field_id=2, name="id", field_type=IntegerType(), required=False),
NestedField(field_id=3, name="name", field_type=StringType(), required=False),
NestedField(field_id=4, name="value", field_type=IntegerType(), required=False),
)
partition_spec = PartitionSpec(
PartitionField(
source_id=1,
field_id=1000,
transform=IdentityTransform(),
name="partition_date"
)
)
table = catalog.create_table(
"default.test_table",
schema=initial_schema,
partition_spec=partition_spec,
)
data_v1 = pa.Table.from_pylist(
[
{"partition_date": date(2024, 1, 1), "id": 1, "name": "Alice", "value": 100},
],
schema=pa.schema([
("partition_date", pa.date32()),
("id", pa.int32()),
("name", pa.string()),
("value", pa.int32()),
])
)
table.append(data_v1)
# Evolve schema: Add 'new_column' field
with table.update_schema() as update:
update.add_column("new_column", StringType())
table = catalog.load_table("default.test_table")
data_v2 = pa.Table.from_pylist(
[
{"partition_date": date(2024, 1, 2), "id": 2, "name": "Bob", "value": 200, "new_column": "new_value"},
],
schema=pa.schema([
("partition_date", pa.date32()),
("id", pa.int32()),
("name", pa.string()),
("value", pa.int32()),
("new_column", pa.string()),
])
)
table.append(data_v2)
# Test 1: Query with all fields (should work)
scan = table.scan(selected_fields=("partition_date", "id", "name", "value", "new_column"))
result = scan.to_arrow()
# Test 2: Query WITHOUT partition field but requesting new_column
# This triggers the bug when reading partition 2024-01-01
scan2 = table.scan(selected_fields=("id", "name", "value", "new_column"))
result2 = scan2.to_arrow()
Traceback:
Traceback (most recent call last):
File "/mnt/home/kevin.jiao/repos/research/reproduce_pyiceberg_bug_minimal.py", line 92, in <module>
result2 = scan2.to_arrow()
^^^^^^^^^^^^^^^^
File "/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/table/__init__.py", line 1989, in to_arrow
).to_table(self.plan_files())
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py", line 1664, in to_table
result = pa.concat_tables(
^^^^^^^^^^^^^^^^^
File "pyarrow/table.pxi", line 6307, in pyarrow.lib.concat_tables
File "/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py", line 1665, in <genexpr>
(pa.Table.from_batches([batch]) for batch in itertools.chain([first_batch], batches)), promote_options="permissive"
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py", line 1708, in to_record_batches
for batches in executor.map(batches_for_task, tasks):
File "/mnt/home/kevin.jiao/.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/_base.py", line 619, in result_iterator
yield _result_or_cancel(fs.pop())
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/home/kevin.jiao/.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/_base.py", line 317, in _result_or_cancel
return fut.result(timeout)
^^^^^^^^^^^^^^^^^^^
File "/mnt/home/kevin.jiao/.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/mnt/home/kevin.jiao/.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/mnt/home/kevin.jiao/.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py", line 1705, in batches_for_task
return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py", line 1744, in _record_batches_from_scan_tasks_and_deletes
for batch in batches:
File "/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py", line 1513, in _task_to_record_batches
projected_missing_fields = _get_column_projection_values(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py", line 1472, in _get_column_projection_values
partition_schema = partition_spec.partition_type(projected_schema)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/partitioning.py", line 226, in partition_type
source_type = schema.find_type(field.source_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/schema.py", line 235, in find_type
field = self.find_field(name_or_id=name_or_id, case_sensitive=case_sensitive)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/schema.py", line 212, in find_field
raise ValueError(f"Could not find field with id: {name_or_id}")
Willingness to contribute
- I can contribute a fix for this bug independently
- I would be willing to contribute a fix for this bug with guidance from the Iceberg community
- I cannot contribute a fix for this bug at this time
Metadata
Metadata
Assignees
Labels
No labels