Skip to content

Commit aadc89c

Browse files
committed
Change DataScan to accept Metadata and io
For the partial deletes I want to do a scan on in memory metadata. Changing this API allows this.
1 parent 4148edb commit aadc89c

File tree

4 files changed

+84
-105
lines changed

4 files changed

+84
-105
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@
158158
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string
159159

160160
if TYPE_CHECKING:
161-
from pyiceberg.table import FileScanTask, Table
161+
from pyiceberg.table import FileScanTask
162162

163163
logger = logging.getLogger(__name__)
164164

@@ -1034,7 +1034,8 @@ def _read_all_delete_files(fs: FileSystem, tasks: Iterable[FileScanTask]) -> Dic
10341034

10351035
def project_table(
10361036
tasks: Iterable[FileScanTask],
1037-
table: Table,
1037+
table_metadata: TableMetadata,
1038+
io: FileIO,
10381039
row_filter: BooleanExpression,
10391040
projected_schema: Schema,
10401041
case_sensitive: bool = True,
@@ -1044,7 +1045,8 @@ def project_table(
10441045
10451046
Args:
10461047
tasks (Iterable[FileScanTask]): A URI or a path to a local file.
1047-
table (Table): The table that's being queried.
1048+
table_metadata (TableMetadata): The table metadata of the table that's being queried
1049+
io (FileIO): A FileIO to open streams to the object store
10481050
row_filter (BooleanExpression): The expression for filtering rows.
10491051
projected_schema (Schema): The output schema.
10501052
case_sensitive (bool): Case sensitivity when looking up column names.
@@ -1053,24 +1055,24 @@ def project_table(
10531055
Raises:
10541056
ResolveError: When an incompatible query is done.
10551057
"""
1056-
scheme, netloc, _ = PyArrowFileIO.parse_location(table.location())
1057-
if isinstance(table.io, PyArrowFileIO):
1058-
fs = table.io.fs_by_scheme(scheme, netloc)
1058+
scheme, netloc, _ = PyArrowFileIO.parse_location(table_metadata.location)
1059+
if isinstance(io, PyArrowFileIO):
1060+
fs = io.fs_by_scheme(scheme, netloc)
10591061
else:
10601062
try:
10611063
from pyiceberg.io.fsspec import FsspecFileIO
10621064

1063-
if isinstance(table.io, FsspecFileIO):
1065+
if isinstance(io, FsspecFileIO):
10641066
from pyarrow.fs import PyFileSystem
10651067

1066-
fs = PyFileSystem(FSSpecHandler(table.io.get_fs(scheme)))
1068+
fs = PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
10671069
else:
1068-
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {table.io}")
1070+
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}")
10691071
except ModuleNotFoundError as e:
10701072
# When FsSpec is not installed
1071-
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {table.io}") from e
1073+
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e
10721074

1073-
bound_row_filter = bind(table.schema(), row_filter, case_sensitive=case_sensitive)
1075+
bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)
10741076

10751077
projected_field_ids = {
10761078
id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType))
@@ -1089,7 +1091,7 @@ def project_table(
10891091
deletes_per_file.get(task.file.file_path),
10901092
case_sensitive,
10911093
limit,
1092-
table.name_mapping(),
1094+
None,
10931095
)
10941096
for task in tasks
10951097
]

pyiceberg/table/__init__.py

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1209,7 +1209,8 @@ def scan(
12091209
limit: Optional[int] = None,
12101210
) -> DataScan:
12111211
return DataScan(
1212-
table=self,
1212+
table_metadata=self.metadata,
1213+
io=self.io,
12131214
row_filter=row_filter,
12141215
selected_fields=selected_fields,
12151216
case_sensitive=case_sensitive,
@@ -1462,7 +1463,8 @@ def _parse_row_filter(expr: Union[str, BooleanExpression]) -> BooleanExpression:
14621463

14631464

14641465
class TableScan(ABC):
1465-
table: Table
1466+
table_metadata: TableMetadata
1467+
io: FileIO
14661468
row_filter: BooleanExpression
14671469
selected_fields: Tuple[str, ...]
14681470
case_sensitive: bool
@@ -1472,15 +1474,17 @@ class TableScan(ABC):
14721474

14731475
def __init__(
14741476
self,
1475-
table: Table,
1477+
table_metadata: TableMetadata,
1478+
io: FileIO,
14761479
row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
14771480
selected_fields: Tuple[str, ...] = ("*",),
14781481
case_sensitive: bool = True,
14791482
snapshot_id: Optional[int] = None,
14801483
options: Properties = EMPTY_DICT,
14811484
limit: Optional[int] = None,
14821485
):
1483-
self.table = table
1486+
self.table_metadata = table_metadata
1487+
self.io = io
14841488
self.row_filter = _parse_row_filter(row_filter)
14851489
self.selected_fields = selected_fields
14861490
self.case_sensitive = case_sensitive
@@ -1490,19 +1494,20 @@ def __init__(
14901494

14911495
def snapshot(self) -> Optional[Snapshot]:
14921496
if self.snapshot_id:
1493-
return self.table.snapshot_by_id(self.snapshot_id)
1494-
return self.table.current_snapshot()
1497+
return self.table_metadata.snapshot_by_id(self.snapshot_id)
1498+
return self.table_metadata.current_snapshot()
14951499

14961500
def projection(self) -> Schema:
1497-
current_schema = self.table.schema()
1501+
current_schema = self.table_metadata.schema()
14981502
if self.snapshot_id is not None:
1499-
snapshot = self.table.snapshot_by_id(self.snapshot_id)
1503+
snapshot = self.table_metadata.snapshot_by_id(self.snapshot_id)
15001504
if snapshot is not None:
15011505
if snapshot.schema_id is not None:
1502-
snapshot_schema = self.table.schemas().get(snapshot.schema_id)
1503-
if snapshot_schema is not None:
1504-
current_schema = snapshot_schema
1505-
else:
1506+
try:
1507+
current_schema = next(
1508+
schema for schema in self.table_metadata.schemas if schema.schema_id == snapshot.schema_id
1509+
)
1510+
except StopIteration:
15061511
warnings.warn(f"Metadata does not contain schema with id: {snapshot.schema_id}")
15071512
else:
15081513
raise ValueError(f"Snapshot not found: {self.snapshot_id}")
@@ -1528,7 +1533,7 @@ def update(self: S, **overrides: Any) -> S:
15281533
def use_ref(self: S, name: str) -> S:
15291534
if self.snapshot_id:
15301535
raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}")
1531-
if snapshot := self.table.snapshot_by_name(name):
1536+
if snapshot := self.table_metadata.snapshot_by_name(name):
15321537
return self.update(snapshot_id=snapshot.snapshot_id)
15331538

15341539
raise ValueError(f"Cannot scan unknown ref={name}")
@@ -1620,33 +1625,21 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent
16201625

16211626

16221627
class DataScan(TableScan):
1623-
def __init__(
1624-
self,
1625-
table: Table,
1626-
row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
1627-
selected_fields: Tuple[str, ...] = ("*",),
1628-
case_sensitive: bool = True,
1629-
snapshot_id: Optional[int] = None,
1630-
options: Properties = EMPTY_DICT,
1631-
limit: Optional[int] = None,
1632-
):
1633-
super().__init__(table, row_filter, selected_fields, case_sensitive, snapshot_id, options, limit)
1634-
16351628
def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
1636-
project = inclusive_projection(self.table.schema(), self.table.specs()[spec_id])
1629+
project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id])
16371630
return project(self.row_filter)
16381631

16391632
@cached_property
16401633
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
16411634
return KeyDefaultDict(self._build_partition_projection)
16421635

16431636
def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
1644-
spec = self.table.specs()[spec_id]
1645-
return manifest_evaluator(spec, self.table.schema(), self.partition_filters[spec_id], self.case_sensitive)
1637+
spec = self.table_metadata.specs()[spec_id]
1638+
return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive)
16461639

16471640
def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
1648-
spec = self.table.specs()[spec_id]
1649-
partition_type = spec.partition_type(self.table.schema())
1641+
spec = self.table_metadata.specs()[spec_id]
1642+
partition_type = spec.partition_type(self.table_metadata.schema())
16501643
partition_schema = Schema(*partition_type.fields)
16511644
partition_expr = self.partition_filters[spec_id]
16521645

@@ -1681,16 +1674,14 @@ def plan_files(self) -> Iterable[FileScanTask]:
16811674
if not snapshot:
16821675
return iter([])
16831676

1684-
io = self.table.io
1685-
16861677
# step 1: filter manifests using partition summaries
16871678
# the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id
16881679

16891680
manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
16901681

16911682
manifests = [
16921683
manifest_file
1693-
for manifest_file in snapshot.manifests(io)
1684+
for manifest_file in snapshot.manifests(self.io)
16941685
if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
16951686
]
16961687

@@ -1699,7 +1690,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
16991690

17001691
partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)
17011692
metrics_evaluator = _InclusiveMetricsEvaluator(
1702-
self.table.schema(), self.row_filter, self.case_sensitive, self.options.get("include_empty_files") == "true"
1693+
self.table_metadata.schema(), self.row_filter, self.case_sensitive, self.options.get("include_empty_files") == "true"
17031694
).eval
17041695

17051696
min_data_sequence_number = _min_data_file_sequence_number(manifests)
@@ -1713,7 +1704,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
17131704
lambda args: _open_manifest(*args),
17141705
[
17151706
(
1716-
io,
1707+
self.io,
17171708
manifest,
17181709
partition_evaluators[manifest.partition_spec_id],
17191710
metrics_evaluator,
@@ -1749,7 +1740,8 @@ def to_arrow(self) -> pa.Table:
17491740

17501741
return project_table(
17511742
self.plan_files(),
1752-
self.table,
1743+
self.table_metadata,
1744+
self.io,
17531745
self.row_filter,
17541746
self.projection(),
17551747
case_sensitive=self.case_sensitive,

pyiceberg/table/metadata.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,12 @@ def new_snapshot_id(self) -> int:
278278

279279
return snapshot_id
280280

281+
def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
282+
"""Return the snapshot referenced by the given name or null if no such reference exists."""
283+
if ref := self.refs.get(name):
284+
return self.snapshot_by_id(ref.snapshot_id)
285+
return None
286+
281287
def current_snapshot(self) -> Optional[Snapshot]:
282288
"""Get the current snapshot for this table, or None if there is no current snapshot."""
283289
if self.current_snapshot_id is not None:

tests/io/test_pyarrow.py

Lines changed: 36 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import pytest
2929
from pyarrow.fs import FileType, LocalFileSystem
3030

31-
from pyiceberg.catalog.noop import NoopCatalog
3231
from pyiceberg.exceptions import ResolveError
3332
from pyiceberg.expressions import (
3433
AlwaysFalse,
@@ -72,7 +71,7 @@
7271
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
7372
from pyiceberg.partitioning import PartitionSpec
7473
from pyiceberg.schema import Schema, make_compatible_name, visit
75-
from pyiceberg.table import FileScanTask, Table, TableProperties
74+
from pyiceberg.table import FileScanTask, TableProperties
7675
from pyiceberg.table.metadata import TableMetadataV2
7776
from pyiceberg.typedef import UTF8
7877
from pyiceberg.types import (
@@ -876,7 +875,7 @@ def project(
876875
schema: Schema, files: List[str], expr: Optional[BooleanExpression] = None, table_schema: Optional[Schema] = None
877876
) -> pa.Table:
878877
return project_table(
879-
[
878+
tasks=[
880879
FileScanTask(
881880
DataFile(
882881
content=DataFileContent.DATA,
@@ -889,21 +888,16 @@ def project(
889888
)
890889
for file in files
891890
],
892-
Table(
893-
("namespace", "table"),
894-
metadata=TableMetadataV2(
895-
location="file://a/b/",
896-
last_column_id=1,
897-
format_version=2,
898-
schemas=[table_schema or schema],
899-
partition_specs=[PartitionSpec()],
900-
),
901-
metadata_location="file://a/b/c.json",
902-
io=PyArrowFileIO(),
903-
catalog=NoopCatalog("NoopCatalog"),
891+
table_metadata=TableMetadataV2(
892+
location="file://a/b/",
893+
last_column_id=1,
894+
format_version=2,
895+
schemas=[table_schema or schema],
896+
partition_specs=[PartitionSpec()],
904897
),
905-
expr or AlwaysTrue(),
906-
schema,
898+
io=PyArrowFileIO(),
899+
row_filter=expr or AlwaysTrue(),
900+
projected_schema=schema,
907901
case_sensitive=True,
908902
)
909903

@@ -1362,20 +1356,15 @@ def test_delete(deletes_file: str, example_task: FileScanTask, table_schema_simp
13621356

13631357
with_deletes = project_table(
13641358
tasks=[example_task_with_delete],
1365-
table=Table(
1366-
("namespace", "table"),
1367-
metadata=TableMetadataV2(
1368-
location=metadata_location,
1369-
last_column_id=1,
1370-
format_version=2,
1371-
current_schema_id=1,
1372-
schemas=[table_schema_simple],
1373-
partition_specs=[PartitionSpec()],
1374-
),
1375-
metadata_location=metadata_location,
1376-
io=load_file_io(),
1377-
catalog=NoopCatalog("noop"),
1359+
table_metadata=TableMetadataV2(
1360+
location=metadata_location,
1361+
last_column_id=1,
1362+
format_version=2,
1363+
current_schema_id=1,
1364+
schemas=[table_schema_simple],
1365+
partition_specs=[PartitionSpec()],
13781366
),
1367+
io=load_file_io(),
13791368
row_filter=AlwaysTrue(),
13801369
projected_schema=table_schema_simple,
13811370
)
@@ -1405,20 +1394,15 @@ def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_
14051394

14061395
with_deletes = project_table(
14071396
tasks=[example_task_with_delete],
1408-
table=Table(
1409-
("namespace", "table"),
1410-
metadata=TableMetadataV2(
1411-
location=metadata_location,
1412-
last_column_id=1,
1413-
format_version=2,
1414-
current_schema_id=1,
1415-
schemas=[table_schema_simple],
1416-
partition_specs=[PartitionSpec()],
1417-
),
1418-
metadata_location=metadata_location,
1419-
io=load_file_io(),
1420-
catalog=NoopCatalog("noop"),
1397+
table_metadata=TableMetadataV2(
1398+
location=metadata_location,
1399+
last_column_id=1,
1400+
format_version=2,
1401+
current_schema_id=1,
1402+
schemas=[table_schema_simple],
1403+
partition_specs=[PartitionSpec()],
14211404
),
1405+
io=load_file_io(),
14221406
row_filter=AlwaysTrue(),
14231407
projected_schema=table_schema_simple,
14241408
)
@@ -1439,21 +1423,16 @@ def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_
14391423
def test_pyarrow_wrap_fsspec(example_task: FileScanTask, table_schema_simple: Schema) -> None:
14401424
metadata_location = "file://a/b/c.json"
14411425
projection = project_table(
1442-
[example_task],
1443-
Table(
1444-
("namespace", "table"),
1445-
metadata=TableMetadataV2(
1446-
location=metadata_location,
1447-
last_column_id=1,
1448-
format_version=2,
1449-
current_schema_id=1,
1450-
schemas=[table_schema_simple],
1451-
partition_specs=[PartitionSpec()],
1452-
),
1453-
metadata_location=metadata_location,
1454-
io=load_file_io(properties={"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"}, location=metadata_location),
1455-
catalog=NoopCatalog("NoopCatalog"),
1426+
tasks=[example_task],
1427+
table_metadata=TableMetadataV2(
1428+
location=metadata_location,
1429+
last_column_id=1,
1430+
format_version=2,
1431+
current_schema_id=1,
1432+
schemas=[table_schema_simple],
1433+
partition_specs=[PartitionSpec()],
14561434
),
1435+
io=load_file_io(properties={"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"}, location=metadata_location),
14571436
case_sensitive=True,
14581437
projected_schema=table_schema_simple,
14591438
row_filter=AlwaysTrue(),

0 commit comments

Comments
 (0)