Skip to content

Commit fbf6492

Browse files
authored
Merge pull request #1 from HonahX/honahx-update-datascan
Fix Name-mapping issue in fd-update-datascan
2 parents aadc89c + 7e59342 commit fbf6492

File tree

4 files changed

+19
-6
lines changed

4 files changed

+19
-6
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1091,7 +1091,7 @@ def project_table(
10911091
deletes_per_file.get(task.file.file_path),
10921092
case_sensitive,
10931093
limit,
1094-
None,
1094+
table_metadata.name_mapping(),
10951095
)
10961096
for task in tasks
10971097
]

pyiceberg/table/__init__.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@
103103
)
104104
from pyiceberg.table.name_mapping import (
105105
NameMapping,
106-
parse_mapping_from_json,
107106
update_mapping,
108107
)
109108
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
@@ -1307,10 +1306,7 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive
13071306

13081307
def name_mapping(self) -> Optional[NameMapping]:
13091308
"""Return the table's field-id NameMapping."""
1310-
if name_mapping_json := self.properties.get(TableProperties.DEFAULT_NAME_MAPPING):
1311-
return parse_mapping_from_json(name_mapping_json)
1312-
else:
1313-
return None
1309+
return self.metadata.name_mapping()
13141310

13151311
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
13161312
"""

pyiceberg/table/metadata.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from pyiceberg.exceptions import ValidationError
3636
from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec, assign_fresh_partition_spec_ids
3737
from pyiceberg.schema import Schema, assign_fresh_schema_ids
38+
from pyiceberg.table.name_mapping import NameMapping, parse_mapping_from_json
3839
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType
3940
from pyiceberg.table.snapshots import MetadataLogEntry, Snapshot, SnapshotLogEntry
4041
from pyiceberg.table.sorting import (
@@ -237,6 +238,13 @@ def schema(self) -> Schema:
237238
"""Return the schema for this table."""
238239
return next(schema for schema in self.schemas if schema.schema_id == self.current_schema_id)
239240

241+
def name_mapping(self) -> Optional[NameMapping]:
242+
"""Return the table's field-id NameMapping."""
243+
if name_mapping_json := self.properties.get("schema.name-mapping.default"):
244+
return parse_mapping_from_json(name_mapping_json)
245+
else:
246+
return None
247+
240248
def spec(self) -> PartitionSpec:
241249
"""Return the partition spec of this table."""
242250
return next(spec for spec in self.partition_specs if spec.spec_id == self.default_spec_id)

tests/integration/test_add_files.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ def test_add_files_to_unpartitioned_table(spark: SparkSession, session_catalog:
158158
for col in df.columns:
159159
assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 rows to be non-null"
160160

161+
# check that the table can be read by pyiceberg
162+
assert len(tbl.scan().to_arrow()) == 5, "Expected 5 rows"
163+
161164

162165
@pytest.mark.integration
163166
@pytest.mark.parametrize("format_version", [1, 2])
@@ -255,6 +258,9 @@ def test_add_files_to_unpartitioned_table_with_schema_updates(
255258
value_count = 1 if col == "quux" else 6
256259
assert df.filter(df[col].isNotNull()).count() == value_count, f"Expected {value_count} rows to be non-null"
257260

261+
# check that the table can be read by pyiceberg
262+
assert len(tbl.scan().to_arrow()) == 6, "Expected 6 rows"
263+
258264

259265
@pytest.mark.integration
260266
@pytest.mark.parametrize("format_version", [1, 2])
@@ -324,6 +330,9 @@ def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Ca
324330
assert [row.file_count for row in partition_rows] == [5]
325331
assert [(row.partition.baz, row.partition.qux_month) for row in partition_rows] == [(123, 650)]
326332

333+
# check that the table can be read by pyiceberg
334+
assert len(tbl.scan().to_arrow()) == 5, "Expected 5 rows"
335+
327336

328337
@pytest.mark.integration
329338
@pytest.mark.parametrize("format_version", [1, 2])

0 commit comments

Comments
 (0)