Skip to content

Commit a925d69

Browse files
committed
Merge branch 'fd-update-datascan' of github.com:Fokko/iceberg-python into fd-add-ability-to-delete-full-data-files
2 parents c3fa7e7 + fbf6492 commit a925d69

File tree

5 files changed

+92
-91
lines changed

5 files changed

+92
-91
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@
159159
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string
160160

161161
if TYPE_CHECKING:
162-
from pyiceberg.table import FileScanTask, Table
162+
from pyiceberg.table import FileScanTask
163163

164164
logger = logging.getLogger(__name__)
165165

@@ -1046,7 +1046,8 @@ def _read_all_delete_files(fs: FileSystem, tasks: Iterable[FileScanTask]) -> Dic
10461046

10471047
def project_table(
10481048
tasks: Iterable[FileScanTask],
1049-
table: Table,
1049+
table_metadata: TableMetadata,
1050+
io: FileIO,
10501051
row_filter: BooleanExpression,
10511052
projected_schema: Schema,
10521053
case_sensitive: bool = True,
@@ -1056,7 +1057,8 @@ def project_table(
10561057
10571058
Args:
10581059
tasks (Iterable[FileScanTask]): A URI or a path to a local file.
1059-
table (Table): The table that's being queried.
1060+
table_metadata (TableMetadata): The table metadata of the table that's being queried
1061+
io (FileIO): A FileIO to open streams to the object store
10601062
row_filter (BooleanExpression): The expression for filtering rows.
10611063
projected_schema (Schema): The output schema.
10621064
case_sensitive (bool): Case sensitivity when looking up column names.
@@ -1065,24 +1067,24 @@ def project_table(
10651067
Raises:
10661068
ResolveError: When an incompatible query is done.
10671069
"""
1068-
scheme, netloc, _ = PyArrowFileIO.parse_location(table.location())
1069-
if isinstance(table.io, PyArrowFileIO):
1070-
fs = table.io.fs_by_scheme(scheme, netloc)
1070+
scheme, netloc, _ = PyArrowFileIO.parse_location(table_metadata.location)
1071+
if isinstance(io, PyArrowFileIO):
1072+
fs = io.fs_by_scheme(scheme, netloc)
10711073
else:
10721074
try:
10731075
from pyiceberg.io.fsspec import FsspecFileIO
10741076

1075-
if isinstance(table.io, FsspecFileIO):
1077+
if isinstance(io, FsspecFileIO):
10761078
from pyarrow.fs import PyFileSystem
10771079

1078-
fs = PyFileSystem(FSSpecHandler(table.io.get_fs(scheme)))
1080+
fs = PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
10791081
else:
1080-
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {table.io}")
1082+
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}")
10811083
except ModuleNotFoundError as e:
10821084
# When FsSpec is not installed
1083-
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {table.io}") from e
1085+
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e
10841086

1085-
bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive)
1087+
bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)
10861088

10871089
projected_field_ids = {
10881090
id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType))
@@ -1101,7 +1103,7 @@ def project_table(
11011103
deletes_per_file.get(task.file.file_path),
11021104
case_sensitive,
11031105
limit,
1104-
table.name_mapping(),
1106+
table_metadata.name_mapping(),
11051107
)
11061108
for task in tasks
11071109
]

pyiceberg/table/__init__.py

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@
108108
)
109109
from pyiceberg.table.name_mapping import (
110110
NameMapping,
111-
parse_mapping_from_json,
112111
update_mapping,
113112
)
114113
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
@@ -1220,7 +1219,8 @@ def scan(
12201219
limit: Optional[int] = None,
12211220
) -> DataScan:
12221221
return DataScan(
1223-
table=self,
1222+
table_metadata=self.metadata,
1223+
io=self.io,
12241224
row_filter=row_filter,
12251225
selected_fields=selected_fields,
12261226
case_sensitive=case_sensitive,
@@ -1317,10 +1317,7 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive
13171317

13181318
def name_mapping(self) -> Optional[NameMapping]:
13191319
"""Return the table's field-id NameMapping."""
1320-
if name_mapping_json := self.properties.get(TableProperties.DEFAULT_NAME_MAPPING):
1321-
return parse_mapping_from_json(name_mapping_json)
1322-
else:
1323-
return None
1320+
return self.metadata.name_mapping()
13241321

13251322
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
13261323
"""
@@ -1513,10 +1510,11 @@ def projection(self) -> Schema:
15131510
snapshot = self.table_metadata.snapshot_by_id(self.snapshot_id)
15141511
if snapshot is not None:
15151512
if snapshot.schema_id is not None:
1516-
snapshot_schema = self.table_metadata.schemas().get(snapshot.schema_id)
1517-
if snapshot_schema is not None:
1518-
current_schema = snapshot_schema
1519-
else:
1513+
try:
1514+
current_schema = next(
1515+
schema for schema in self.table_metadata.schemas if schema.schema_id == snapshot.schema_id
1516+
)
1517+
except StopIteration:
15201518
warnings.warn(f"Metadata does not contain schema with id: {snapshot.schema_id}")
15211519
else:
15221520
raise ValueError(f"Snapshot not found: {self.snapshot_id}")
@@ -1542,7 +1540,7 @@ def update(self: S, **overrides: Any) -> S:
15421540
def use_ref(self: S, name: str) -> S:
15431541
if self.snapshot_id:
15441542
raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}")
1545-
if snapshot := self.table.snapshot_by_name(name):
1543+
if snapshot := self.table_metadata.snapshot_by_name(name):
15461544
return self.update(snapshot_id=snapshot.snapshot_id)
15471545

15481546
raise ValueError(f"Cannot scan unknown ref={name}")
@@ -1636,20 +1634,20 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent
16361634
class DataScan(TableScan):
16371635

16381636
def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
1639-
project = inclusive_projection(self.table.schema(), self.table.specs()[spec_id])
1637+
project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id])
16401638
return project(self.row_filter)
16411639

16421640
@cached_property
16431641
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
16441642
return KeyDefaultDict(self._build_partition_projection)
16451643

16461644
def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
1647-
spec = self.table.specs()[spec_id]
1648-
return manifest_evaluator(spec, self.table.schema(), self.partition_filters[spec_id], self.case_sensitive)
1645+
spec = self.table_metadata.specs()[spec_id]
1646+
return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive)
16491647

16501648
def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
1651-
spec = self.table.specs()[spec_id]
1652-
partition_type = spec.partition_type(self.table.schema())
1649+
spec = self.table_metadata.specs()[spec_id]
1650+
partition_type = spec.partition_type(self.table_metadata.schema())
16531651
partition_schema = Schema(*partition_type.fields)
16541652
partition_expr = self.partition_filters[spec_id]
16551653

@@ -1684,16 +1682,14 @@ def plan_files(self) -> Iterable[FileScanTask]:
16841682
if not snapshot:
16851683
return iter([])
16861684

1687-
io = self.table.io
1688-
16891685
# step 1: filter manifests using partition summaries
16901686
# the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id
16911687

16921688
manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
16931689

16941690
manifests = [
16951691
manifest_file
1696-
for manifest_file in snapshot.manifests(io)
1692+
for manifest_file in snapshot.manifests(self.io)
16971693
if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
16981694
]
16991695

@@ -1702,7 +1698,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
17021698

17031699
partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)
17041700
metrics_evaluator = _InclusiveMetricsEvaluator(
1705-
self.table.schema(), self.row_filter, self.case_sensitive, self.options.get("include_empty_files") == "true"
1701+
self.table_metadata.schema(), self.row_filter, self.case_sensitive, self.options.get("include_empty_files") == "true"
17061702
).eval
17071703

17081704
min_data_sequence_number = _min_data_file_sequence_number(manifests)
@@ -1716,7 +1712,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
17161712
lambda args: _open_manifest(*args),
17171713
[
17181714
(
1719-
io,
1715+
self.io,
17201716
manifest,
17211717
partition_evaluators[manifest.partition_spec_id],
17221718
metrics_evaluator,
@@ -1752,7 +1748,8 @@ def to_arrow(self) -> pa.Table:
17521748

17531749
return project_table(
17541750
self.plan_files(),
1755-
self.table,
1751+
self.table_metadata,
1752+
self.io,
17561753
self.row_filter,
17571754
self.projection(),
17581755
case_sensitive=self.case_sensitive,

pyiceberg/table/metadata.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from pyiceberg.exceptions import ValidationError
3636
from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec, assign_fresh_partition_spec_ids
3737
from pyiceberg.schema import Schema, assign_fresh_schema_ids
38+
from pyiceberg.table.name_mapping import NameMapping, parse_mapping_from_json
3839
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType
3940
from pyiceberg.table.snapshots import MetadataLogEntry, Snapshot, SnapshotLogEntry
4041
from pyiceberg.table.sorting import (
@@ -237,6 +238,13 @@ def schema(self) -> Schema:
237238
"""Return the schema for this table."""
238239
return next(schema for schema in self.schemas if schema.schema_id == self.current_schema_id)
239240

241+
def name_mapping(self) -> Optional[NameMapping]:
242+
"""Return the table's field-id NameMapping."""
243+
if name_mapping_json := self.properties.get("schema.name-mapping.default"):
244+
return parse_mapping_from_json(name_mapping_json)
245+
else:
246+
return None
247+
240248
def spec(self) -> PartitionSpec:
241249
"""Return the partition spec of this table."""
242250
return next(spec for spec in self.partition_specs if spec.spec_id == self.default_spec_id)
@@ -278,6 +286,12 @@ def new_snapshot_id(self) -> int:
278286

279287
return snapshot_id
280288

289+
def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
290+
"""Return the snapshot referenced by the given name or null if no such reference exists."""
291+
if ref := self.refs.get(name):
292+
return self.snapshot_by_id(ref.snapshot_id)
293+
return None
294+
281295
def current_snapshot(self) -> Optional[Snapshot]:
282296
"""Get the current snapshot for this table, or None if there is no current snapshot."""
283297
if self.current_snapshot_id is not None:

tests/integration/test_add_files.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ def test_add_files_to_unpartitioned_table(spark: SparkSession, session_catalog:
158158
for col in df.columns:
159159
assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 rows to be non-null"
160160

161+
# check that the table can be read by pyiceberg
162+
assert len(tbl.scan().to_arrow()) == 5, "Expected 5 rows"
163+
161164

162165
@pytest.mark.integration
163166
@pytest.mark.parametrize("format_version", [1, 2])
@@ -255,6 +258,9 @@ def test_add_files_to_unpartitioned_table_with_schema_updates(
255258
value_count = 1 if col == "quux" else 6
256259
assert df.filter(df[col].isNotNull()).count() == value_count, f"Expected {value_count} rows to be non-null"
257260

261+
# check that the table can be read by pyiceberg
262+
assert len(tbl.scan().to_arrow()) == 6, "Expected 6 rows"
263+
258264

259265
@pytest.mark.integration
260266
@pytest.mark.parametrize("format_version", [1, 2])
@@ -324,6 +330,9 @@ def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Ca
324330
assert [row.file_count for row in partition_rows] == [5]
325331
assert [(row.partition.baz, row.partition.qux_month) for row in partition_rows] == [(123, 650)]
326332

333+
# check that the table can be read by pyiceberg
334+
assert len(tbl.scan().to_arrow()) == 5, "Expected 5 rows"
335+
327336

328337
@pytest.mark.integration
329338
@pytest.mark.parametrize("format_version", [1, 2])

0 commit comments

Comments
 (0)