From 57f51a29951a01cec586c9fbd446bba98e87c5f3 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Wed, 21 May 2025 16:12:23 +0100 Subject: [PATCH 01/12] Refactor hierarchy structure to share code --- pyiceberg/table/__init__.py | 348 +++++++++++++++++++++--------------- 1 file changed, 199 insertions(+), 149 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 78676a774a..5f62632e82 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1507,16 +1507,17 @@ def _parse_row_filter(expr: Union[str, BooleanExpression]) -> BooleanExpression: return parser.parse(expr) if isinstance(expr, str) else expr -S = TypeVar("S", bound="TableScan", covariant=True) +S = TypeVar("S", bound="AbstractTableScan", covariant=True) -class TableScan(ABC): +class AbstractTableScan(ABC): + """A base class for all table scans.""" + table_metadata: TableMetadata io: FileIO row_filter: BooleanExpression selected_fields: Tuple[str, ...] case_sensitive: bool - snapshot_id: Optional[int] options: Properties limit: Optional[int] @@ -1527,7 +1528,6 @@ def __init__( row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, selected_fields: Tuple[str, ...] = ("*",), case_sensitive: bool = True, - snapshot_id: Optional[int] = None, options: Properties = EMPTY_DICT, limit: Optional[int] = None, ): @@ -1536,10 +1536,177 @@ def __init__( self.row_filter = _parse_row_filter(row_filter) self.selected_fields = selected_fields self.case_sensitive = case_sensitive - self.snapshot_id = snapshot_id self.options = options self.limit = limit + @abstractmethod + def projection(self) -> Schema: ... + + @abstractmethod + def plan_files(self) -> Iterable[ScanTask]: ... + + @abstractmethod + def to_arrow(self) -> pa.Table: ... + + @abstractmethod + def count(self) -> int: ... + + def select(self: S, *field_names: str) -> S: + if "*" in self.selected_fields: + return self.update(selected_fields=field_names) + return self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names)))) + + def filter(self: S, expr: Union[str, BooleanExpression]) -> S: + return self.update(row_filter=And(self.row_filter, _parse_row_filter(expr))) + + def with_case_sensitive(self: S, case_sensitive: bool = True) -> S: + return self.update(case_sensitive=case_sensitive) + + def update(self: S, **overrides: Any) -> S: + """Create a copy of this table scan with updated fields.""" + return type(self)(**{**self.__dict__, **overrides}) + + def to_pandas(self, **kwargs: Any) -> pd.DataFrame: + """Read a Pandas DataFrame eagerly from this Iceberg table scan. + + Returns: + pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table scan + """ + return self.to_arrow().to_pandas(**kwargs) + + def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: + """Shorthand for loading this table scan in DuckDB. + + Returns: + DuckDBPyConnection: In memory DuckDB connection with the Iceberg table scan. + """ + import duckdb + + con = connection or duckdb.connect(database=":memory:") + con.register(table_name, self.to_arrow()) + + return con + + def to_ray(self) -> ray.data.dataset.Dataset: + """Read a Ray Dataset eagerly from this Iceberg table scan. + + Returns: + ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table scan + """ + import ray + + return ray.data.from_arrow(self.to_arrow()) + + def to_polars(self) -> pl.DataFrame: + """Read a Polars DataFrame from this Iceberg table scan. + + Returns: + pl.DataFrame: Materialized Polars Dataframe from the Iceberg table scan + """ + import polars as pl + + result = pl.from_arrow(self.to_arrow()) + if isinstance(result, pl.Series): + result = result.to_frame() + + return result + + +class FileBasedTableScan(AbstractTableScan, ABC): + """A base class for table scans that plan FileScanTasks.""" + + @abstractmethod + def plan_files(self) -> Iterable[FileScanTask]: ... + + def to_arrow(self) -> pa.Table: + """Read an Arrow table eagerly from this scan. + + All rows will be loaded into memory at once. + + Returns: + pa.Table: Materialized Arrow Table from the Iceberg table scan + """ + from pyiceberg.io.pyarrow import ArrowScan + + return ArrowScan( + self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit + ).to_table(self.plan_files()) + + def to_arrow_batch_reader(self) -> pa.RecordBatchReader: + """Return an Arrow RecordBatchReader from this scan. + + For large results, using a RecordBatchReader requires less memory than + loading an Arrow Table for the same DataScan, because a RecordBatch + is read one at a time. + + Returns: + pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table scan + which can be used to read a stream of record batches one by one. + """ + import pyarrow as pa + + from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow + + target_schema = schema_to_pyarrow(self.projection()) + batches = ArrowScan( + self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit + ).to_record_batches(self.plan_files()) + + return pa.RecordBatchReader.from_batches( + target_schema, + batches, + ).cast(target_schema) + + def count(self) -> int: + from pyiceberg.io.pyarrow import ArrowScan + + # Usage: Calculates the total number of records in a Scan that haven't had positional deletes. + res = 0 + # every task is a FileScanTask + tasks = self.plan_files() + + for task in tasks: + # task.residual is a Boolean Expression if the filter condition is fully satisfied by the + # partition value and task.delete_files represents that positional delete haven't been merged yet + # hence those files have to read as a pyarrow table applying the filter and deletes + if task.residual == AlwaysTrue() and len(task.delete_files) == 0: + # Every File has a metadata stat that stores the file record count + res += task.file.record_count + else: + arrow_scan = ArrowScan( + table_metadata=self.table_metadata, + io=self.io, + projected_schema=self.projection(), + row_filter=self.row_filter, + case_sensitive=self.case_sensitive, + ) + tbl = arrow_scan.to_table([task]) + res += len(tbl) + return res + + +T = TypeVar("T", bound="TableScan", covariant=True) + + +class TableScan(AbstractTableScan, ABC): + """A base class for non-incremental table scans that target a single snapshot.""" + + snapshot_id: Optional[int] + + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + snapshot_id: Optional[int] = None, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + ): + super().__init__(table_metadata, io, row_filter, selected_fields, case_sensitive, options, limit) + self.snapshot_id = snapshot_id + def snapshot(self) -> Optional[Snapshot]: if self.snapshot_id: return self.table_metadata.snapshot_by_id(self.snapshot_id) @@ -1565,23 +1732,7 @@ def projection(self) -> Schema: return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) - @abstractmethod - def plan_files(self) -> Iterable[ScanTask]: ... - - @abstractmethod - def to_arrow(self) -> pa.Table: ... - - @abstractmethod - def to_pandas(self, **kwargs: Any) -> pd.DataFrame: ... - - @abstractmethod - def to_polars(self) -> pl.DataFrame: ... - - def update(self: S, **overrides: Any) -> S: - """Create a copy of this table scan with updated fields.""" - return type(self)(**{**self.__dict__, **overrides}) - - def use_ref(self: S, name: str) -> S: + def use_ref(self: T, name: str) -> T: if self.snapshot_id: raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") if snapshot := self.table_metadata.snapshot_by_name(name): @@ -1589,20 +1740,6 @@ def use_ref(self: S, name: str) -> S: raise ValueError(f"Cannot scan unknown ref={name}") - def select(self: S, *field_names: str) -> S: - if "*" in self.selected_fields: - return self.update(selected_fields=field_names) - return self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names)))) - - def filter(self: S, expr: Union[str, BooleanExpression]) -> S: - return self.update(row_filter=And(self.row_filter, _parse_row_filter(expr))) - - def with_case_sensitive(self: S, case_sensitive: bool = True) -> S: - return self.update(case_sensitive=case_sensitive) - - @abstractmethod - def count(self) -> int: ... - class ScanTask(ABC): pass @@ -1688,7 +1825,30 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent return set() -class DataScan(TableScan): +class DataScan(FileBasedTableScan, TableScan): + """A scan of a table's data. + + Args: + row_filter: + A string or BooleanExpression that describes the + desired rows + selected_fields: + A tuple of strings representing the column names + to return in the output dataframe. + case_sensitive: + If True column matching is case sensitive + snapshot_id: + Optional Snapshot ID to time travel to. If None, + scans the table as of the current snapshot ID. + options: + Additional Table properties as a dictionary of + string key value pairs to use for this scan. + limit: + An integer representing the number of rows to + return in the scan result. If None, fetches all + matching rows. + """ + def _build_partition_projection(self, spec_id: int) -> BooleanExpression: project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive) return project(self.row_filter) @@ -1745,7 +1905,8 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu ) ) - def _check_sequence_number(self, min_sequence_number: int, manifest: ManifestFile) -> bool: + @staticmethod + def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool: """Ensure that no manifests are loaded that contain deletes that are older than the data. Args: @@ -1834,117 +1995,6 @@ def plan_files(self) -> Iterable[FileScanTask]: for data_entry in data_entries ] - def to_arrow(self) -> pa.Table: - """Read an Arrow table eagerly from this DataScan. - - All rows will be loaded into memory at once. - - Returns: - pa.Table: Materialized Arrow Table from the Iceberg table's DataScan - """ - from pyiceberg.io.pyarrow import ArrowScan - - return ArrowScan( - self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit - ).to_table(self.plan_files()) - - def to_arrow_batch_reader(self) -> pa.RecordBatchReader: - """Return an Arrow RecordBatchReader from this DataScan. - - For large results, using a RecordBatchReader requires less memory than - loading an Arrow Table for the same DataScan, because a RecordBatch - is read one at a time. - - Returns: - pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan - which can be used to read a stream of record batches one by one. - """ - import pyarrow as pa - - from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow - - target_schema = schema_to_pyarrow(self.projection()) - batches = ArrowScan( - self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit - ).to_record_batches(self.plan_files()) - - return pa.RecordBatchReader.from_batches( - target_schema, - batches, - ).cast(target_schema) - - def to_pandas(self, **kwargs: Any) -> pd.DataFrame: - """Read a Pandas DataFrame eagerly from this Iceberg table. - - Returns: - pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table - """ - return self.to_arrow().to_pandas(**kwargs) - - def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: - """Shorthand for loading the Iceberg Table in DuckDB. - - Returns: - DuckDBPyConnection: In memory DuckDB connection with the Iceberg table. - """ - import duckdb - - con = connection or duckdb.connect(database=":memory:") - con.register(table_name, self.to_arrow()) - - return con - - def to_ray(self) -> ray.data.dataset.Dataset: - """Read a Ray Dataset eagerly from this Iceberg table. - - Returns: - ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table - """ - import ray - - return ray.data.from_arrow(self.to_arrow()) - - def to_polars(self) -> pl.DataFrame: - """Read a Polars DataFrame from this Iceberg table. - - Returns: - pl.DataFrame: Materialized Polars Dataframe from the Iceberg table - """ - import polars as pl - - result = pl.from_arrow(self.to_arrow()) - if isinstance(result, pl.Series): - result = result.to_frame() - - return result - - def count(self) -> int: - from pyiceberg.io.pyarrow import ArrowScan - - # Usage: Calculates the total number of records in a Scan that haven't had positional deletes. - res = 0 - # every task is a FileScanTask - tasks = self.plan_files() - - for task in tasks: - # task.residual is a Boolean Expression if the filter condition is fully satisfied by the - # partition value and task.delete_files represents that positional delete haven't been merged yet - # hence those files have to read as a pyarrow table applying the filter and deletes - if task.residual == AlwaysTrue() and len(task.delete_files) == 0: - # Every File has a metadata stat that stores the file record count - res += task.file.record_count - else: - arrow_scan = ArrowScan( - table_metadata=self.table_metadata, - io=self.io, - projected_schema=self.projection(), - row_filter=self.row_filter, - case_sensitive=self.case_sensitive, - ) - tbl = arrow_scan.to_table([task]) - res += len(tbl) - return res - @dataclass(frozen=True) class WriteTask: From b5d3363c8f16698cf93ff458e2856515bc11ef47 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Wed, 21 May 2025 18:52:04 +0100 Subject: [PATCH 02/12] Attempt with from ancestor being optional --- pyiceberg/table/__init__.py | 126 ++++++++++++++++++++++++++++++++++- pyiceberg/table/snapshots.py | 9 +++ 2 files changed, 133 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 5f62632e82..79d250f806 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -91,6 +91,7 @@ from pyiceberg.table.snapshots import ( Snapshot, SnapshotLogEntry, + is_ancestor_of, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.table.update import ( @@ -1612,7 +1613,7 @@ def to_polars(self) -> pl.DataFrame: return result -class FileBasedTableScan(AbstractTableScan, ABC): +class FileBasedScan(AbstractTableScan, ABC): """A base class for table scans that plan FileScanTasks.""" @abstractmethod @@ -1825,7 +1826,7 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent return set() -class DataScan(FileBasedTableScan, TableScan): +class DataScan(FileBasedScan, TableScan): """A scan of a table's data. Args: @@ -1996,6 +1997,127 @@ def plan_files(self) -> Iterable[FileScanTask]: ] +A = TypeVar("A", bound="IncrementalAppendScan", covariant=True) + + +class IncrementalAppendScan(FileBasedScan, AbstractTableScan): + """An incremental scan of a table's data that accumulates appended data between two snapshots. + + Args: + row_filter: + A string or BooleanExpression that describes the + desired rows + selected_fields: + A tuple of strings representing the column names + to return in the output dataframe. + case_sensitive: + If True column matching is case sensitive + from_snapshot_id: + Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. + Omitting it will default to the eldest ancestor of the "to" snapshot, inclusively. + to_snapshot_id: + Optional ID of the "to" snapshot, to end the incremental scan at, inclusively. + Omitting it will default to the table's current snapshot. + options: + Additional Table properties as a dictionary of + string key value pairs to use for this scan. + limit: + An integer representing the number of rows to + return in the scan result. If None, fetches all + matching rows. + """ + + from_snapshot_id: Optional[int] + to_snapshot_id: Optional[int] + + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + from_snapshot_id: Optional[int] = None, # Exclusive + to_snapshot_id: Optional[int] = None, # Inclusive + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + ): + super().__init__(table_metadata, io, row_filter, selected_fields, case_sensitive, options, limit) + self.from_snapshot_id = from_snapshot_id + self.to_snapshot_id = to_snapshot_id + + def with_from_snapshot(self: A, from_snapshot_id: Optional[int]) -> A: + """Instructs this scan to look for changes starting from a particular snapshot (exclusive). + + If the start snapshot is not configured, it defaults to the eldest ancestor of the to_snapshot (inclusive). + + Args: + from_snapshot_id: the start snapshot ID (exclusive) + + Returns: + this for method chaining + """ + + return self.update(from_snapshot_id=from_snapshot_id) + + def with_to_snapshot(self: A, to_snapshot_id: Optional[int]) -> A: + """Instructs this scan to look for changes up to a particular snapshot (inclusive). + + If the end snapshot is not configured, it defaults to the current table snapshot (inclusive). + + Args: + to_snapshot_id: the end snapshot ID (inclusive) + + Returns: + this for method chaining + """ + + return self.update(to_snapshot_id=to_snapshot_id) + + def projection(self) -> Schema: + current_schema = self.table_metadata.schema() + + if "*" in self.selected_fields: + return current_schema + + return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) + + def plan_files(self) -> Iterable[ScanTask]: + current_snapshot = self.table_metadata.current_snapshot() + from_snapshot_id = self.from_snapshot_id + to_snapshot_id = self.to_snapshot_id + + if current_snapshot is None and from_snapshot_id is None and to_snapshot_id is None: + return iter([]) + + if to_snapshot_id is None: + if current_snapshot is None: + raise ValueError("End snapshot of append scan is not set and table has no current snapshot") + + to_snapshot_id = current_snapshot.snapshot_id + elif self._is_snapshot_missing(to_snapshot_id): + raise ValueError(f"End snapshot of append scan not found on table metadata: {to_snapshot_id}") + + if from_snapshot_id is not None: + if from_snapshot_id == to_snapshot_id: + return iter([]) + + if self._is_snapshot_missing(from_snapshot_id): + raise ValueError(f"Start snapshot of append scan not found on table metadata: {from_snapshot_id}") + + if not is_ancestor_of(to_snapshot_id, from_snapshot_id, self.table_metadata): + raise ValueError( + f"Append scan's start snapshot {from_snapshot_id} is not an ancestor of end snapshot {to_snapshot_id}" + ) + + return self._do_plan_files(from_snapshot_id, to_snapshot_id) # type: ignore + + # TODO: Note behaviour change that throws + def _is_snapshot_missing(self, snapshot_id: int) -> bool: + """Returns whether the snapshot ID is missing in the table metadata.""" + return self.table_metadata.snapshot_by_id(snapshot_id) is not None + + @dataclass(frozen=True) class WriteTask: """Task with the parameters for writing a DataFile.""" diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 8d1a24c420..30d3bd7282 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -448,3 +448,12 @@ def ancestors_between( break else: yield from ancestors_of(to_snapshot, table_metadata) + + +def is_ancestor_of(snapshot_id: int, ancestor_snapshot_id: int, table_metadata: TableMetadata) -> bool: + """Returns whether ancestor_snapshot_id is an ancestor of snapshot_id.""" + for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): + if snapshot.snapshot_id == ancestor_snapshot_id: + return True + + return False From ea251e0a4e1c0ed17439d0ca475ac3a25781cc1e Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Wed, 21 May 2025 20:13:34 +0100 Subject: [PATCH 03/12] Attempt at incremental append scan --- pyiceberg/manifest.py | 8 + pyiceberg/table/__init__.py | 390 +++++++++++++++++++++-------------- pyiceberg/table/snapshots.py | 19 +- 3 files changed, 259 insertions(+), 158 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 61cb87e3d8..8b6ba4091d 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -717,6 +717,14 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List if not discard_deleted or entry.status != ManifestEntryStatus.DELETED ] + def __eq__(self, other: Any) -> bool: + """Return the equality of two instances of the ManifestFile class.""" + return self.manifest_path == other.manifest_path if isinstance(other, ManifestFile) else False + + def __hash__(self) -> int: + """Return the hash of manifest_path.""" + return hash(self.manifest_path) + @cached(cache=LRUCache(maxsize=128), key=lambda io, manifest_list: hashkey(manifest_list)) def _manifests(io: FileIO, manifest_list: str) -> Tuple[ManifestFile, ...]: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 79d250f806..5992b232e0 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -69,6 +69,7 @@ DataFileContent, ManifestContent, ManifestEntry, + ManifestEntryStatus, ManifestFile, ) from pyiceberg.partitioning import ( @@ -89,8 +90,10 @@ ) from pyiceberg.table.refs import SnapshotRef from pyiceberg.table.snapshots import ( + Operation, Snapshot, SnapshotLogEntry, + ancestors_between_ids, is_ancestor_of, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -1850,79 +1853,6 @@ class DataScan(FileBasedScan, TableScan): matching rows. """ - def _build_partition_projection(self, spec_id: int) -> BooleanExpression: - project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive) - return project(self.row_filter) - - @cached_property - def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: - return KeyDefaultDict(self._build_partition_projection) - - def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: - spec = self.table_metadata.specs()[spec_id] - return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive) - - def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: - spec = self.table_metadata.specs()[spec_id] - partition_type = spec.partition_type(self.table_metadata.schema()) - partition_schema = Schema(*partition_type.fields) - partition_expr = self.partition_filters[spec_id] - - # The lambda created here is run in multiple threads. - # So we avoid creating _EvaluatorExpression methods bound to a single - # shared instance across multiple threads. - return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition) - - def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]: - schema = self.table_metadata.schema() - include_empty_files = strtobool(self.options.get("include_empty_files", "false")) - - # The lambda created here is run in multiple threads. - # So we avoid creating _InclusiveMetricsEvaluator methods bound to a single - # shared instance across multiple threads. - return lambda data_file: _InclusiveMetricsEvaluator( - schema, - self.row_filter, - self.case_sensitive, - include_empty_files, - ).eval(data_file) - - def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]: - spec = self.table_metadata.specs()[spec_id] - - # The lambda created here is run in multiple threads. - # So we avoid creating _EvaluatorExpression methods bound to a single - # shared instance across multiple threads. - # return lambda data_file: (partition_schema, partition_expr, self.case_sensitive)(data_file.partition) - from pyiceberg.expressions.visitors import residual_evaluator_of - - # assert self.row_filter == False - return lambda datafile: ( - residual_evaluator_of( - spec=spec, - expr=self.row_filter, - case_sensitive=self.case_sensitive, - schema=self.table_metadata.schema(), - ) - ) - - @staticmethod - def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool: - """Ensure that no manifests are loaded that contain deletes that are older than the data. - - Args: - min_sequence_number (int): The minimal sequence number. - manifest (ManifestFile): A ManifestFile that can be either data or deletes. - - Returns: - Boolean indicating if it is either a data file, or a relevant delete file. - """ - return manifest.content == ManifestContent.DATA or ( - # Not interested in deletes that are older than the data - manifest.content == ManifestContent.DELETES - and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number - ) - def plan_files(self) -> Iterable[FileScanTask]: """Plans the relevant files by filtering on the PartitionSpecs. @@ -1933,68 +1863,14 @@ def plan_files(self) -> Iterable[FileScanTask]: if not snapshot: return iter([]) - # step 1: filter manifests using partition summaries - # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id - - manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) - - residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator) - - manifests = [ - manifest_file - for manifest_file in snapshot.manifests(self.io) - if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) - ] - - # step 2: filter the data files in each manifest - # this filter depends on the partition spec used to write the manifest file - - partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator) - - min_sequence_number = _min_sequence_number(manifests) - - data_entries: List[ManifestEntry] = [] - positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER) - - executor = ExecutorFactory.get_or_create() - for manifest_entry in chain( - *executor.map( - lambda args: _open_manifest(*args), - [ - ( - self.io, - manifest, - partition_evaluators[manifest.partition_spec_id], - self._build_metrics_evaluator(), - ) - for manifest in manifests - if self._check_sequence_number(min_sequence_number, manifest) - ], - ) - ): - data_file = manifest_entry.data_file - if data_file.content == DataFileContent.DATA: - data_entries.append(manifest_entry) - elif data_file.content == DataFileContent.POSITION_DELETES: - positional_delete_entries.add(manifest_entry) - elif data_file.content == DataFileContent.EQUALITY_DELETES: - raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568") - else: - raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}") - - return [ - FileScanTask( - data_entry.data_file, - delete_files=_match_deletes_to_data_file( - data_entry, - positional_delete_entries, - ), - residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for( - data_entry.data_file.partition - ), - ) - for data_entry in data_entries - ] + return ManifestGroup( + manifests=snapshot.manifests(self.io), + io=self.io, + table_metadata=self.table_metadata, + parsed_row_filter=self.row_filter, + case_sensitive=self.case_sensitive, + options=self.options, + ).plan_files() A = TypeVar("A", bound="IncrementalAppendScan", covariant=True) @@ -2013,8 +1889,8 @@ class IncrementalAppendScan(FileBasedScan, AbstractTableScan): case_sensitive: If True column matching is case sensitive from_snapshot_id: - Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. - Omitting it will default to the eldest ancestor of the "to" snapshot, inclusively. + Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. When the scan is + read, this must be specified. to_snapshot_id: Optional ID of the "to" snapshot, to end the incremental scan at, inclusively. Omitting it will default to the table's current snapshot. @@ -2057,7 +1933,6 @@ def with_from_snapshot(self: A, from_snapshot_id: Optional[int]) -> A: Returns: this for method chaining """ - return self.update(from_snapshot_id=from_snapshot_id) def with_to_snapshot(self: A, to_snapshot_id: Optional[int]) -> A: @@ -2071,7 +1946,6 @@ def with_to_snapshot(self: A, to_snapshot_id: Optional[int]) -> A: Returns: this for method chaining """ - return self.update(to_snapshot_id=to_snapshot_id) def projection(self) -> Schema: @@ -2082,13 +1956,38 @@ def projection(self) -> Schema: return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) - def plan_files(self) -> Iterable[ScanTask]: + def plan_files(self) -> Iterable[FileScanTask]: + to_snapshot_id, from_snapshot_id = self._validate_and_resolve_snapshots() + + append_snapshots: List[Snapshot] = self._appends_between(from_snapshot_id, to_snapshot_id, self.table_metadata) + if len(append_snapshots) == 0: + return iter([]) + append_snapshot_ids: Set[int] = {snapshot.snapshot_id for snapshot in append_snapshots} + + manifests = { + manifest_file + for snapshot in append_snapshots + for manifest_file in snapshot.manifests(self.io) + if manifest_file.content == ManifestContent.DATA and manifest_file.added_snapshot_id in append_snapshot_ids + } + + return ManifestGroup( + manifests=list(manifests), + io=self.io, + table_metadata=self.table_metadata, + parsed_row_filter=self.row_filter, + case_sensitive=self.case_sensitive, + options=self.options, + manifest_entry_filter=lambda manifest_entry: manifest_entry.snapshot_id in append_snapshot_ids + and manifest_entry.status == ManifestEntryStatus.ADDED, + ).plan_files() + + def _validate_and_resolve_snapshots(self) -> tuple[int, int]: current_snapshot = self.table_metadata.current_snapshot() - from_snapshot_id = self.from_snapshot_id to_snapshot_id = self.to_snapshot_id - if current_snapshot is None and from_snapshot_id is None and to_snapshot_id is None: - return iter([]) + if self.from_snapshot_id is None: + raise ValueError("Start snapshot of append scan unspecified, please set from_snapshot_id") if to_snapshot_id is None: if current_snapshot is None: @@ -2098,25 +1997,204 @@ def plan_files(self) -> Iterable[ScanTask]: elif self._is_snapshot_missing(to_snapshot_id): raise ValueError(f"End snapshot of append scan not found on table metadata: {to_snapshot_id}") - if from_snapshot_id is not None: - if from_snapshot_id == to_snapshot_id: - return iter([]) + if self._is_snapshot_missing(self.from_snapshot_id): + raise ValueError(f"Start snapshot of append scan not found on table metadata: {self.from_snapshot_id}") - if self._is_snapshot_missing(from_snapshot_id): - raise ValueError(f"Start snapshot of append scan not found on table metadata: {from_snapshot_id}") - - if not is_ancestor_of(to_snapshot_id, from_snapshot_id, self.table_metadata): - raise ValueError( - f"Append scan's start snapshot {from_snapshot_id} is not an ancestor of end snapshot {to_snapshot_id}" - ) + if not is_ancestor_of(to_snapshot_id, self.from_snapshot_id, self.table_metadata): + raise ValueError( + f"Append scan's start snapshot {self.from_snapshot_id} is not an ancestor of end snapshot {to_snapshot_id}" + ) - return self._do_plan_files(from_snapshot_id, to_snapshot_id) # type: ignore + return self.from_snapshot_id, to_snapshot_id - # TODO: Note behaviour change that throws + # TODO: Note behaviour change from DataScan that we throw def _is_snapshot_missing(self, snapshot_id: int) -> bool: - """Returns whether the snapshot ID is missing in the table metadata.""" + """Return whether the snapshot ID is missing in the table metadata.""" return self.table_metadata.snapshot_by_id(snapshot_id) is not None + @staticmethod + def _appends_between( + from_snapshot_id_exclusive: int, to_snapshot_id_inclusive: int, table_metadata: TableMetadata + ) -> List[Snapshot]: + """Return the list of snapshots that are appends between two snapshot IDs.""" + return [ + snapshot + for snapshot in ancestors_between_ids( + from_snapshot_id_exclusive=from_snapshot_id_exclusive, + to_snapshot_id_inclusive=to_snapshot_id_inclusive, + table_metadata=table_metadata, + ) + if snapshot.summary is not None and snapshot.summary.operation == Operation.APPEND + ] + + +class ManifestGroup: + manifests: List[ManifestFile] + io: FileIO + table_metadata: TableMetadata + parsed_row_filter: BooleanExpression + case_sensitive: bool + options: Properties + manifest_entry_filter: Callable[[ManifestEntry], bool] + + def __init__( + self, + manifests: List[ManifestFile], + io: FileIO, + table_metadata: TableMetadata, + parsed_row_filter: BooleanExpression, + case_sensitive: bool, + options: Properties, + manifest_entry_filter: Callable[[ManifestEntry], bool] = lambda _: True, + ): + self.manifests = manifests + self.io = io + self.table_metadata = table_metadata + self.parsed_row_filter = parsed_row_filter + self.case_sensitive = case_sensitive + self.options = options + self.manifest_entry_filter = manifest_entry_filter + + def plan_files(self) -> Iterable[FileScanTask]: + # step 1: filter manifests using partition summaries + # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id + + manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) + manifests = [ + manifest_file + for manifest_file in self.manifests + if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) + ] + + residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator) + + # step 2: filter the data files in each manifest + # this filter depends on the partition spec used to write the manifest file + + partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator) + + min_sequence_number = _min_sequence_number(manifests) + + data_entries: List[ManifestEntry] = [] + positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER) + + executor = ExecutorFactory.get_or_create() + for manifest_entry in chain( + *executor.map( + lambda args: _open_manifest(*args), + [ + ( + self.io, + manifest, + partition_evaluators[manifest.partition_spec_id], + self._build_metrics_evaluator(), + ) + for manifest in manifests + if self._check_sequence_number(min_sequence_number, manifest) + ], + ) + ): + if not self.manifest_entry_filter(manifest_entry): + continue + + data_file = manifest_entry.data_file + if data_file.content == DataFileContent.DATA: + data_entries.append(manifest_entry) + elif data_file.content == DataFileContent.POSITION_DELETES: + positional_delete_entries.add(manifest_entry) + elif data_file.content == DataFileContent.EQUALITY_DELETES: + raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568") + else: + raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}") + + return [ + FileScanTask( + data_entry.data_file, + delete_files=_match_deletes_to_data_file( + data_entry, + positional_delete_entries, + ), + residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for( + data_entry.data_file.partition + ), + ) + for data_entry in data_entries + ] + + def _build_partition_projection(self, spec_id: int) -> BooleanExpression: + project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive) + return project(self.parsed_row_filter) + + # TODO: Document that this method was removed + @cached_property + def _partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: + return KeyDefaultDict(self._build_partition_projection) + + def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: + spec = self.table_metadata.specs()[spec_id] + return manifest_evaluator(spec, self.table_metadata.schema(), self._partition_filters[spec_id], self.case_sensitive) + + def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: + spec = self.table_metadata.specs()[spec_id] + partition_type = spec.partition_type(self.table_metadata.schema()) + partition_schema = Schema(*partition_type.fields) + partition_expr = self._partition_filters[spec_id] + + # The lambda created here is run in multiple threads. + # So we avoid creating _EvaluatorExpression methods bound to a single + # shared instance across multiple threads. + return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition) + + def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]: + schema = self.table_metadata.schema() + include_empty_files = strtobool(self.options.get("include_empty_files", "false")) + + # The lambda created here is run in multiple threads. + # So we avoid creating _InclusiveMetricsEvaluator methods bound to a single + # shared instance across multiple threads. + return lambda data_file: _InclusiveMetricsEvaluator( + schema, + self.parsed_row_filter, + self.case_sensitive, + include_empty_files, + ).eval(data_file) + + def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]: + spec = self.table_metadata.specs()[spec_id] + + # The lambda created here is run in multiple threads. + # So we avoid creating _EvaluatorExpression methods bound to a single + # shared instance across multiple threads. + # return lambda data_file: (partition_schema, partition_expr, self.case_sensitive)(data_file.partition) + from pyiceberg.expressions.visitors import residual_evaluator_of + + # assert self.row_filter == False + return lambda datafile: ( + residual_evaluator_of( + spec=spec, + expr=self.parsed_row_filter, + case_sensitive=self.case_sensitive, + schema=self.table_metadata.schema(), + ) + ) + + @staticmethod + def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool: + """Ensure that no manifests are loaded that contain deletes that are older than the data. + + Args: + min_sequence_number (int): The minimal sequence number. + manifest (ManifestFile): A ManifestFile that can be either data or deletes. + + Returns: + Boolean indicating if it is either a data file, or a relevant delete file. + """ + return manifest.content == ManifestContent.DATA or ( + # Not interested in deletes that are older than the data + manifest.content == ManifestContent.DELETES + and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number + ) + @dataclass(frozen=True) class WriteTask: diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 30d3bd7282..1a2dc4d9f0 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -440,7 +440,7 @@ def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMeta def ancestors_between( from_snapshot: Optional[Snapshot], to_snapshot: Snapshot, table_metadata: TableMetadata ) -> Iterable[Snapshot]: - """Get the ancestors of and including the given snapshot between the to and from snapshots.""" + """Get the ancestors of and including the given snapshot between the to and from snapshots, both inclusively.""" if from_snapshot is not None: for snapshot in ancestors_of(to_snapshot, table_metadata): yield snapshot @@ -450,8 +450,23 @@ def ancestors_between( yield from ancestors_of(to_snapshot, table_metadata) +def ancestors_between_ids( + from_snapshot_id_exclusive: Optional[int], to_snapshot_id_inclusive: int, table_metadata: TableMetadata +) -> Iterable[Snapshot]: + """Return the ancestors of and including the given "to" snapshot, up to but not including the "from" snapshot.""" + # TODO: Test equal from and to snapshot IDs + if from_snapshot_id_exclusive is not None: + for snapshot in ancestors_of(table_metadata.snapshot_by_id(to_snapshot_id_inclusive), table_metadata): + if snapshot.snapshot_id == from_snapshot_id_exclusive: + break + + yield snapshot + else: + yield from ancestors_of(table_metadata.snapshot_by_id(to_snapshot_id_inclusive), table_metadata) + + def is_ancestor_of(snapshot_id: int, ancestor_snapshot_id: int, table_metadata: TableMetadata) -> bool: - """Returns whether ancestor_snapshot_id is an ancestor of snapshot_id.""" + """Return whether ancestor_snapshot_id is an ancestor of snapshot_id.""" for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): if snapshot.snapshot_id == ancestor_snapshot_id: return True From d47c2069285fd1179f1ea888f55ed3194db627ce Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Wed, 21 May 2025 23:56:17 +0100 Subject: [PATCH 04/12] Add tests and fix bugs --- dev/provision.py | 55 +++++++++++ pyiceberg/table/__init__.py | 67 ++++++++++++- pyiceberg/table/snapshots.py | 11 ++- tests/integration/test_reads.py | 163 ++++++++++++++++++++++++++++++++ tests/table/test_snapshots.py | 43 +++++++++ 5 files changed, 331 insertions(+), 8 deletions(-) diff --git a/dev/provision.py b/dev/provision.py index 837189204e..ef58705fa8 100644 --- a/dev/provision.py +++ b/dev/provision.py @@ -413,3 +413,58 @@ ) spark.sql(f"ALTER TABLE {catalog_name}.default.test_empty_scan_ordered_str WRITE ORDERED BY id") spark.sql(f"INSERT INTO {catalog_name}.default.test_empty_scan_ordered_str VALUES 'a', 'c'") + + spark.sql( + f""" + CREATE OR REPLACE TABLE {catalog_name}.default.test_incremental_read ( + dt date, + number integer, + letter string + ) + USING iceberg + TBLPROPERTIES ( + 'format-version'='2' + ); + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_incremental_read + VALUES (CAST('2022-03-01' AS date), 1, 'a') + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_incremental_read + VALUES (CAST('2022-03-01' AS date), 2, 'b') + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_incremental_read + VALUES (CAST('2022-03-02' AS date), 3, 'c'), (CAST('2022-03-02' AS date), 4, 'b') + """ + ) + + spark.sql( + f""" + DELETE FROM {catalog_name}.default.test_incremental_read + WHERE number = 2 + """ + ) + + # https://github.com/apache/iceberg/issues/1092#issuecomment-638432848 / https://github.com/apache/iceberg/issues/3747#issuecomment-1145419407 + # REPLACE TABLE requires certain Hive server configuration + if catalog_name != "hive": + # Replace to break snapshot lineage: + spark.sql( + f""" + REPLACE TABLE {catalog_name}.default.test_incremental_read + USING iceberg + TBLPROPERTIES ('format-version'='2') + AS SELECT number, letter FROM {catalog_name}.default.test_incremental_read + """ + ) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 5992b232e0..d36f77234e 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1096,6 +1096,61 @@ def scan( limit=limit, ) + # TODO: Consider more concise name + def incremental_append_scan( + self, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + from_snapshot_id: Optional[int] = None, # Exclusive + to_snapshot_id: Optional[int] = None, # Inclusive + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + ) -> IncrementalAppendScan: + """Fetch an IncrementalAppendScan based on the table's current metadata. + + The incremental append scan can be used to project the table's data + from append snapshots within a snapshot range and that matches the + provided row_filter onto the table's current schema + + Args: + row_filter: + A string or BooleanExpression that describes the + desired rows + selected_fields: + A tuple of strings representing the column names + to return in the output dataframe. + case_sensitive: + If True column matching is case sensitive + from_snapshot_id: + Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. This can be set + on the IncrementalAppendScan object returned, but ultimately must not be None. + to_snapshot_id: + Optional ID of the "to" snapshot, to end the incremental scan at, inclusively. This can be set on the + IncrementalAppendScan object returned. Ultimately, it will default to the table's current snapshot. + options: + Additional Table properties as a dictionary of + string key value pairs to use for this scan. + limit: + An integer representing the number of rows to + return in the scan result. If None, fetches all + matching rows. + + Returns: + An IncrementalAppendScan based on the table's current metadata and provided parameters. + """ + return IncrementalAppendScan( + table_metadata=self.metadata, + io=self.io, + row_filter=row_filter, + selected_fields=selected_fields, + case_sensitive=case_sensitive, + from_snapshot_id=from_snapshot_id, + to_snapshot_id=to_snapshot_id, + options=options, + limit=limit, + ) + @property def format_version(self) -> TableVersion: return self.metadata.format_version @@ -1890,7 +1945,7 @@ class IncrementalAppendScan(FileBasedScan, AbstractTableScan): If True column matching is case sensitive from_snapshot_id: Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. When the scan is - read, this must be specified. + ultimately planned, this must not be None. to_snapshot_id: Optional ID of the "to" snapshot, to end the incremental scan at, inclusively. Omitting it will default to the table's current snapshot. @@ -1922,7 +1977,7 @@ def __init__( self.from_snapshot_id = from_snapshot_id self.to_snapshot_id = to_snapshot_id - def with_from_snapshot(self: A, from_snapshot_id: Optional[int]) -> A: + def from_snapshot(self: A, from_snapshot_id: Optional[int]) -> A: """Instructs this scan to look for changes starting from a particular snapshot (exclusive). If the start snapshot is not configured, it defaults to the eldest ancestor of the to_snapshot (inclusive). @@ -1935,7 +1990,7 @@ def with_from_snapshot(self: A, from_snapshot_id: Optional[int]) -> A: """ return self.update(from_snapshot_id=from_snapshot_id) - def with_to_snapshot(self: A, to_snapshot_id: Optional[int]) -> A: + def to_snapshot(self: A, to_snapshot_id: Optional[int]) -> A: """Instructs this scan to look for changes up to a particular snapshot (inclusive). If the end snapshot is not configured, it defaults to the current table snapshot (inclusive). @@ -1957,11 +2012,12 @@ def projection(self) -> Schema: return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) def plan_files(self) -> Iterable[FileScanTask]: - to_snapshot_id, from_snapshot_id = self._validate_and_resolve_snapshots() + from_snapshot_id, to_snapshot_id = self._validate_and_resolve_snapshots() append_snapshots: List[Snapshot] = self._appends_between(from_snapshot_id, to_snapshot_id, self.table_metadata) if len(append_snapshots) == 0: return iter([]) + append_snapshot_ids: Set[int] = {snapshot.snapshot_id for snapshot in append_snapshots} manifests = { @@ -2010,7 +2066,7 @@ def _validate_and_resolve_snapshots(self) -> tuple[int, int]: # TODO: Note behaviour change from DataScan that we throw def _is_snapshot_missing(self, snapshot_id: int) -> bool: """Return whether the snapshot ID is missing in the table metadata.""" - return self.table_metadata.snapshot_by_id(snapshot_id) is not None + return self.table_metadata.snapshot_by_id(snapshot_id) is None @staticmethod def _appends_between( @@ -2178,6 +2234,7 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu ) ) + # TODO: Document that this method was removed from DataScan and it was made static @staticmethod def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool: """Ensure that no manifests are loaded that contain deletes that are older than the data. diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 1a2dc4d9f0..f339f64fe6 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -451,10 +451,15 @@ def ancestors_between( def ancestors_between_ids( - from_snapshot_id_exclusive: Optional[int], to_snapshot_id_inclusive: int, table_metadata: TableMetadata + from_snapshot_id_exclusive: Optional[int], + to_snapshot_id_inclusive: int, + table_metadata: TableMetadata, ) -> Iterable[Snapshot]: - """Return the ancestors of and including the given "to" snapshot, up to but not including the "from" snapshot.""" - # TODO: Test equal from and to snapshot IDs + """Return the ancestors of and including the given "to" snapshot, up to but not including the "from" snapshot. + + If from_snapshot_id_exclusive is None or no ancestors of the "to" snapshot match it, all ancestors of the "to" + snapshot are returned. + """ if from_snapshot_id_exclusive is not None: for snapshot in ancestors_of(table_metadata.snapshot_by_id(to_snapshot_id_inclusive), table_metadata): if snapshot.snapshot_id == from_snapshot_id_exclusive: diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 5ac5162f8e..a97fd22b0a 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -1003,3 +1003,166 @@ def test_scan_with_datetime(catalog: Catalog) -> None: df = table.scan(row_filter=LessThan("datetime", yesterday)).to_pandas() assert len(df) == 0 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_append_only(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan() + # Only "append"-operation snapshots occurred in this range + .from_snapshot(test_table.snapshots()[0].snapshot_id) + .to_snapshot(test_table.snapshots()[2].snapshot_id) + ) + + assert len(list(scan.plan_files())) == 2 + + # Check various read methods + assert len(scan.to_arrow()) == 3 + assert len(scan.to_arrow_batch_reader().read_all()) == 3 + assert len(scan.to_pandas()) == 3 + assert len(scan.to_polars()) == 3 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_ignores_non_append_snapshots(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = test_table.incremental_append_scan( + from_snapshot_id=test_table.snapshots()[0].snapshot_id, + # This is a "delete"-operation snapshot, that should be ignored by the append scan + to_snapshot_id=test_table.snapshots()[3].snapshot_id, + ) + + assert len(list(scan.plan_files())) == 2 + assert len(scan.to_arrow()) == 3 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_uses_current_schema(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan() + .from_snapshot(test_table.snapshots()[0].snapshot_id) + .to_snapshot(test_table.snapshots()[2].snapshot_id) + ) + + # The schema within the snapshot range above included an extra date field, but the table was then replaced, + # removing it. An append scan always uses the current schema of the table. + expected_schema = pa.schema( + [ + pa.field("number", pa.int32()), + pa.field("letter", pa.string()), + ] + ) + + result_table = scan.to_arrow() + assert result_table.schema.equals(expected_schema) + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_row_filter(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan(row_filter=EqualTo("letter", "b")) + .from_snapshot(test_table.snapshots()[0].snapshot_id) + .to_snapshot(test_table.snapshots()[2].snapshot_id) + ) + + # This filter should match against the only row added in snapshots[1] and one of the two rows added in snapshots[2] + assert len(list(scan.plan_files())) == 2 + assert len(scan.to_arrow()) == 2 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_selected_fields(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan(selected_fields=("number",)) + .from_snapshot(test_table.snapshots()[0].snapshot_id) + .to_snapshot(test_table.snapshots()[2].snapshot_id) + ) + + expected_schema = pa.schema( + [ + pa.field("number", pa.int32()), + ] + ) + + result_table = scan.to_arrow() + assert result_table.schema.equals(expected_schema) + assert sorted(result_table["number"].to_pylist()) == [2, 3, 4] + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive")]) +def test_incremental_append_scan_to_snapshot_defaults_to_current(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + assert len(test_table.incremental_append_scan().from_snapshot(test_table.snapshots()[0].snapshot_id).to_arrow()) == 3 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_equal_from_and_to_snapshots(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + snapshot_id = test_table.snapshots()[0].snapshot_id + + # Exclusive-inclusive semantics mean an empty table should be returned if equal from and to snapshots are specified + assert len(test_table.incremental_append_scan().from_snapshot(snapshot_id).to_snapshot(snapshot_id).to_arrow()) == 0 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_throws_on_disconnected_snapshots(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + from_id = test_table.snapshots()[0].snapshot_id + to_id = test_table.snapshots()[4].snapshot_id + + with pytest.raises(ValueError) as e: + test_table.incremental_append_scan( + from_snapshot_id=from_id, + # A table replace occurred just before this snapshot, breaking snapshot lineage / incremental-ity + to_snapshot_id=to_id, + ).plan_files() + + assert f"Append scan's start snapshot {from_id} is not an ancestor of end snapshot {to_id}" in str(e.value) + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_throws_on_missing_snapshot_ids(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + # from_snapshot_id not specified + with pytest.raises(ValueError) as e: + test_table.incremental_append_scan( + to_snapshot_id=test_table.snapshots()[0].snapshot_id, + ).plan_files() + assert "Start snapshot of append scan unspecified, please set from_snapshot_id" in str(e.value) + + # from_snapshot_id missing from metadata + with pytest.raises(ValueError) as e: + test_table.incremental_append_scan( + from_snapshot_id=42, + to_snapshot_id=test_table.snapshots()[0].snapshot_id, + ).plan_files() + assert "Start snapshot of append scan not found on table metadata: 42" in str(e.value) + + # to_snapshot_id missing from metadata + with pytest.raises(ValueError) as e: + test_table.incremental_append_scan( + from_snapshot_id=test_table.snapshots()[0].snapshot_id, + to_snapshot_id=42, + ).plan_files() + assert "End snapshot of append scan not found on table metadata: 42" in str(e.value) diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index b71d92aa55..aae31f62f3 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -29,7 +29,9 @@ SnapshotSummaryCollector, Summary, ancestors_between, + ancestors_between_ids, ancestors_of, + is_ancestor_of, update_snapshot_summaries, ) from pyiceberg.transforms import IdentityTransform @@ -434,3 +436,44 @@ def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None: ) == 2000 ) + + +def test_is_ancestor_of(table_v2: Table) -> None: + snapshot_id = 3055729675574597004 + ancestor_snapshot_id = 3051729675574597004 + + assert is_ancestor_of(snapshot_id, ancestor_snapshot_id, table_v2.metadata) + assert not is_ancestor_of(ancestor_snapshot_id, snapshot_id, table_v2.metadata) + + +def test_ancestors_between_ids(table_v2: Table) -> None: + snapshot_id = 3055729675574597004 + ancestor_snapshot_id = 3051729675574597004 + + result = list(ancestors_between_ids(ancestor_snapshot_id, snapshot_id, table_v2.metadata)) + ids = [ancestor.snapshot_id for ancestor in result] + + # Exclusive-inclusive semantics mean just snapshot_id should be returned + assert ids == [snapshot_id] + + +def test_ancestors_between_equal_ids(table_v2: Table) -> None: + snapshot_id = 3055729675574597004 + + result = list(ancestors_between_ids(snapshot_id, snapshot_id, table_v2.metadata)) + assert result == [] + + +def test_ancestors_between_ids_missing_from_snapshot(table_v2: Table) -> None: + snapshot_id = 3055729675574597004 + ancestor_snapshot_id = 3051729675574597004 + + result = list( + ancestors_between_ids( + from_snapshot_id_exclusive=None, to_snapshot_id_inclusive=snapshot_id, table_metadata=table_v2.metadata + ) + ) + ids = [ancestor.snapshot_id for ancestor in result] + + # With a from snapshot missing, all ancestors should be returned + assert ids == [snapshot_id, ancestor_snapshot_id] From 61b4f196b60b6fe3d79dc6d25a079e7b9ecb2f8a Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Thu, 22 May 2025 11:21:08 +0100 Subject: [PATCH 05/12] Rename methods, add TODOs and introduce incremental scan superclass --- pyiceberg/table/__init__.py | 142 +++++++++++++++++++------------- tests/integration/test_reads.py | 48 ++++++----- 2 files changed, 114 insertions(+), 76 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index d36f77234e..d366376067 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1102,8 +1102,8 @@ def incremental_append_scan( row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, selected_fields: Tuple[str, ...] = ("*",), case_sensitive: bool = True, - from_snapshot_id: Optional[int] = None, # Exclusive - to_snapshot_id: Optional[int] = None, # Inclusive + from_snapshot_id_exclusive: Optional[int] = None, + to_snapshot_id_inclusive: Optional[int] = None, options: Properties = EMPTY_DICT, limit: Optional[int] = None, ) -> IncrementalAppendScan: @@ -1122,10 +1122,10 @@ def incremental_append_scan( to return in the output dataframe. case_sensitive: If True column matching is case sensitive - from_snapshot_id: + from_snapshot_id_exclusive: Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. This can be set on the IncrementalAppendScan object returned, but ultimately must not be None. - to_snapshot_id: + to_snapshot_id_inclusive: Optional ID of the "to" snapshot, to end the incremental scan at, inclusively. This can be set on the IncrementalAppendScan object returned. Ultimately, it will default to the table's current snapshot. options: @@ -1145,8 +1145,8 @@ def incremental_append_scan( row_filter=row_filter, selected_fields=selected_fields, case_sensitive=case_sensitive, - from_snapshot_id=from_snapshot_id, - to_snapshot_id=to_snapshot_id, + from_snapshot_id_exclusive=from_snapshot_id_exclusive, + to_snapshot_id_inclusive=to_snapshot_id_inclusive, options=options, limit=limit, ) @@ -1928,38 +1928,14 @@ def plan_files(self) -> Iterable[FileScanTask]: ).plan_files() -A = TypeVar("A", bound="IncrementalAppendScan", covariant=True) +A = TypeVar("A", bound="IncrementalScan", covariant=True) -class IncrementalAppendScan(FileBasedScan, AbstractTableScan): - """An incremental scan of a table's data that accumulates appended data between two snapshots. - - Args: - row_filter: - A string or BooleanExpression that describes the - desired rows - selected_fields: - A tuple of strings representing the column names - to return in the output dataframe. - case_sensitive: - If True column matching is case sensitive - from_snapshot_id: - Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. When the scan is - ultimately planned, this must not be None. - to_snapshot_id: - Optional ID of the "to" snapshot, to end the incremental scan at, inclusively. - Omitting it will default to the table's current snapshot. - options: - Additional Table properties as a dictionary of - string key value pairs to use for this scan. - limit: - An integer representing the number of rows to - return in the scan result. If None, fetches all - matching rows. - """ +class IncrementalScan(AbstractTableScan, ABC): + """A base class for incremental scans.""" - from_snapshot_id: Optional[int] - to_snapshot_id: Optional[int] + from_snapshot_id_exclusive: Optional[int] + to_snapshot_id_inclusive: Optional[int] def __init__( self, @@ -1968,40 +1944,36 @@ def __init__( row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, selected_fields: Tuple[str, ...] = ("*",), case_sensitive: bool = True, - from_snapshot_id: Optional[int] = None, # Exclusive - to_snapshot_id: Optional[int] = None, # Inclusive + from_snapshot_id_exclusive: Optional[int] = None, + to_snapshot_id_inclusive: Optional[int] = None, options: Properties = EMPTY_DICT, limit: Optional[int] = None, ): super().__init__(table_metadata, io, row_filter, selected_fields, case_sensitive, options, limit) - self.from_snapshot_id = from_snapshot_id - self.to_snapshot_id = to_snapshot_id + self.from_snapshot_id_exclusive = from_snapshot_id_exclusive + self.to_snapshot_id_inclusive = to_snapshot_id_inclusive - def from_snapshot(self: A, from_snapshot_id: Optional[int]) -> A: + def from_snapshot_exclusive(self: A, from_snapshot_id_exclusive: Optional[int]) -> A: """Instructs this scan to look for changes starting from a particular snapshot (exclusive). - If the start snapshot is not configured, it defaults to the eldest ancestor of the to_snapshot (inclusive). - Args: - from_snapshot_id: the start snapshot ID (exclusive) + from_snapshot_id_exclusive: the start snapshot ID (exclusive) Returns: this for method chaining """ - return self.update(from_snapshot_id=from_snapshot_id) + return self.update(from_snapshot_id_exclusive=from_snapshot_id_exclusive) - def to_snapshot(self: A, to_snapshot_id: Optional[int]) -> A: + def to_snapshot_inclusive(self: A, to_snapshot_id_inclusive: Optional[int]) -> A: """Instructs this scan to look for changes up to a particular snapshot (inclusive). - If the end snapshot is not configured, it defaults to the current table snapshot (inclusive). - Args: - to_snapshot_id: the end snapshot ID (inclusive) + to_snapshot_id_inclusive: the end snapshot ID (inclusive) Returns: this for method chaining """ - return self.update(to_snapshot_id=to_snapshot_id) + return self.update(to_snapshot_id_inclusive=to_snapshot_id_inclusive) def projection(self) -> Schema: current_schema = self.table_metadata.schema() @@ -2011,6 +1983,61 @@ def projection(self) -> Schema: return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) + +class IncrementalAppendScan(IncrementalScan, FileBasedScan): + """An incremental scan of a table's data that accumulates appended data between two snapshots. + + Args: + row_filter: + A string or BooleanExpression that describes the + desired rows + selected_fields: + A tuple of strings representing the column names + to return in the output dataframe. + case_sensitive: + If True column matching is case sensitive + from_snapshot_id_exclusive: + Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. When the scan is + ultimately planned, this must not be None. + to_snapshot_id_inclusive: + Optional ID of the "to" snapshot, to end the incremental scan at, inclusively. + Omitting it will default to the table's current snapshot. + options: + Additional Table properties as a dictionary of + string key value pairs to use for this scan. + limit: + An integer representing the number of rows to + return in the scan result. If None, fetches all + matching rows. + """ + + from_snapshot_id_exclusive: Optional[int] + to_snapshot_id_inclusive: Optional[int] + + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + from_snapshot_id_exclusive: Optional[int] = None, + to_snapshot_id_inclusive: Optional[int] = None, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + ): + super().__init__( + table_metadata, + io, + row_filter, + selected_fields, + case_sensitive, + from_snapshot_id_exclusive, + to_snapshot_id_inclusive, + options, + limit, + ) + def plan_files(self) -> Iterable[FileScanTask]: from_snapshot_id, to_snapshot_id = self._validate_and_resolve_snapshots() @@ -2040,10 +2067,10 @@ def plan_files(self) -> Iterable[FileScanTask]: def _validate_and_resolve_snapshots(self) -> tuple[int, int]: current_snapshot = self.table_metadata.current_snapshot() - to_snapshot_id = self.to_snapshot_id + to_snapshot_id = self.to_snapshot_id_inclusive - if self.from_snapshot_id is None: - raise ValueError("Start snapshot of append scan unspecified, please set from_snapshot_id") + if self.from_snapshot_id_exclusive is None: + raise ValueError("Start snapshot of append scan unspecified, please set from_snapshot_id_exclusive") if to_snapshot_id is None: if current_snapshot is None: @@ -2053,15 +2080,15 @@ def _validate_and_resolve_snapshots(self) -> tuple[int, int]: elif self._is_snapshot_missing(to_snapshot_id): raise ValueError(f"End snapshot of append scan not found on table metadata: {to_snapshot_id}") - if self._is_snapshot_missing(self.from_snapshot_id): - raise ValueError(f"Start snapshot of append scan not found on table metadata: {self.from_snapshot_id}") + if self._is_snapshot_missing(self.from_snapshot_id_exclusive): + raise ValueError(f"Start snapshot of append scan not found on table metadata: {self.from_snapshot_id_exclusive}") - if not is_ancestor_of(to_snapshot_id, self.from_snapshot_id, self.table_metadata): + if not is_ancestor_of(to_snapshot_id, self.from_snapshot_id_exclusive, self.table_metadata): raise ValueError( - f"Append scan's start snapshot {self.from_snapshot_id} is not an ancestor of end snapshot {to_snapshot_id}" + f"Append scan's start snapshot {self.from_snapshot_id_exclusive} is not an ancestor of end snapshot {to_snapshot_id}" ) - return self.from_snapshot_id, to_snapshot_id + return self.from_snapshot_id_exclusive, to_snapshot_id # TODO: Note behaviour change from DataScan that we throw def _is_snapshot_missing(self, snapshot_id: int) -> bool: @@ -2182,6 +2209,7 @@ def _build_partition_projection(self, spec_id: int) -> BooleanExpression: return project(self.parsed_row_filter) # TODO: Document that this method was removed + # TODO: Or probably: Don't move it and think. We should cache on the scan classes themselves, not here @cached_property def _partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: return KeyDefaultDict(self._build_partition_projection) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index a97fd22b0a..429f51b14f 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -1013,8 +1013,8 @@ def test_incremental_append_scan_append_only(catalog: Catalog) -> None: scan = ( test_table.incremental_append_scan() # Only "append"-operation snapshots occurred in this range - .from_snapshot(test_table.snapshots()[0].snapshot_id) - .to_snapshot(test_table.snapshots()[2].snapshot_id) + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) ) assert len(list(scan.plan_files())) == 2 @@ -1032,9 +1032,9 @@ def test_incremental_append_scan_ignores_non_append_snapshots(catalog: Catalog) test_table = catalog.load_table("default.test_incremental_read") scan = test_table.incremental_append_scan( - from_snapshot_id=test_table.snapshots()[0].snapshot_id, + from_snapshot_id_exclusive=test_table.snapshots()[0].snapshot_id, # This is a "delete"-operation snapshot, that should be ignored by the append scan - to_snapshot_id=test_table.snapshots()[3].snapshot_id, + to_snapshot_id_inclusive=test_table.snapshots()[3].snapshot_id, ) assert len(list(scan.plan_files())) == 2 @@ -1048,8 +1048,8 @@ def test_incremental_append_scan_uses_current_schema(catalog: Catalog) -> None: scan = ( test_table.incremental_append_scan() - .from_snapshot(test_table.snapshots()[0].snapshot_id) - .to_snapshot(test_table.snapshots()[2].snapshot_id) + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) ) # The schema within the snapshot range above included an extra date field, but the table was then replaced, @@ -1072,8 +1072,8 @@ def test_incremental_append_scan_row_filter(catalog: Catalog) -> None: scan = ( test_table.incremental_append_scan(row_filter=EqualTo("letter", "b")) - .from_snapshot(test_table.snapshots()[0].snapshot_id) - .to_snapshot(test_table.snapshots()[2].snapshot_id) + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) ) # This filter should match against the only row added in snapshots[1] and one of the two rows added in snapshots[2] @@ -1088,8 +1088,8 @@ def test_incremental_append_scan_selected_fields(catalog: Catalog) -> None: scan = ( test_table.incremental_append_scan(selected_fields=("number",)) - .from_snapshot(test_table.snapshots()[0].snapshot_id) - .to_snapshot(test_table.snapshots()[2].snapshot_id) + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) ) expected_schema = pa.schema( @@ -1108,7 +1108,9 @@ def test_incremental_append_scan_selected_fields(catalog: Catalog) -> None: def test_incremental_append_scan_to_snapshot_defaults_to_current(catalog: Catalog) -> None: test_table = catalog.load_table("default.test_incremental_read") - assert len(test_table.incremental_append_scan().from_snapshot(test_table.snapshots()[0].snapshot_id).to_arrow()) == 3 + assert ( + len(test_table.incremental_append_scan().from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id).to_arrow()) == 3 + ) @pytest.mark.integration @@ -1118,7 +1120,15 @@ def test_incremental_append_scan_equal_from_and_to_snapshots(catalog: Catalog) - snapshot_id = test_table.snapshots()[0].snapshot_id # Exclusive-inclusive semantics mean an empty table should be returned if equal from and to snapshots are specified - assert len(test_table.incremental_append_scan().from_snapshot(snapshot_id).to_snapshot(snapshot_id).to_arrow()) == 0 + assert ( + len( + test_table.incremental_append_scan() + .from_snapshot_exclusive(snapshot_id) + .to_snapshot_inclusive(snapshot_id) + .to_arrow() + ) + == 0 + ) @pytest.mark.integration @@ -1131,9 +1141,9 @@ def test_incremental_append_scan_throws_on_disconnected_snapshots(catalog: Catal with pytest.raises(ValueError) as e: test_table.incremental_append_scan( - from_snapshot_id=from_id, + from_snapshot_id_exclusive=from_id, # A table replace occurred just before this snapshot, breaking snapshot lineage / incremental-ity - to_snapshot_id=to_id, + to_snapshot_id_inclusive=to_id, ).plan_files() assert f"Append scan's start snapshot {from_id} is not an ancestor of end snapshot {to_id}" in str(e.value) @@ -1147,22 +1157,22 @@ def test_incremental_append_scan_throws_on_missing_snapshot_ids(catalog: Catalog # from_snapshot_id not specified with pytest.raises(ValueError) as e: test_table.incremental_append_scan( - to_snapshot_id=test_table.snapshots()[0].snapshot_id, + to_snapshot_id_inclusive=test_table.snapshots()[0].snapshot_id, ).plan_files() assert "Start snapshot of append scan unspecified, please set from_snapshot_id" in str(e.value) # from_snapshot_id missing from metadata with pytest.raises(ValueError) as e: test_table.incremental_append_scan( - from_snapshot_id=42, - to_snapshot_id=test_table.snapshots()[0].snapshot_id, + from_snapshot_id_exclusive=42, + to_snapshot_id_inclusive=test_table.snapshots()[0].snapshot_id, ).plan_files() assert "Start snapshot of append scan not found on table metadata: 42" in str(e.value) # to_snapshot_id missing from metadata with pytest.raises(ValueError) as e: test_table.incremental_append_scan( - from_snapshot_id=test_table.snapshots()[0].snapshot_id, - to_snapshot_id=42, + from_snapshot_id_exclusive=test_table.snapshots()[0].snapshot_id, + to_snapshot_id_inclusive=42, ).plan_files() assert "End snapshot of append scan not found on table metadata: 42" in str(e.value) From 2747e1976e9c4f1a52bdb09bd7ebd5827ce33684 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Thu, 22 May 2025 11:22:05 +0100 Subject: [PATCH 06/12] Remove unused properties --- pyiceberg/table/__init__.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index d366376067..0ea25b7232 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2011,9 +2011,6 @@ class IncrementalAppendScan(IncrementalScan, FileBasedScan): matching rows. """ - from_snapshot_id_exclusive: Optional[int] - to_snapshot_id_inclusive: Optional[int] - def __init__( self, table_metadata: TableMetadata, From 08ad36f57c19b0674db61330a24d7c77b86068db Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Thu, 22 May 2025 14:34:24 +0100 Subject: [PATCH 07/12] Rearrange classes --- pyiceberg/table/__init__.py | 86 +++++++++++++++---------------------- 1 file changed, 35 insertions(+), 51 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 0ea25b7232..b47307b691 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1674,6 +1674,10 @@ def to_polars(self) -> pl.DataFrame: class FileBasedScan(AbstractTableScan, ABC): """A base class for table scans that plan FileScanTasks.""" + @cached_property + def _manifest_group_planner(self) -> ManifestGroupPlanner: + return ManifestGroupPlanner(self) + @abstractmethod def plan_files(self) -> Iterable[FileScanTask]: ... @@ -1918,14 +1922,12 @@ def plan_files(self) -> Iterable[FileScanTask]: if not snapshot: return iter([]) - return ManifestGroup( - manifests=snapshot.manifests(self.io), - io=self.io, - table_metadata=self.table_metadata, - parsed_row_filter=self.row_filter, - case_sensitive=self.case_sensitive, - options=self.options, - ).plan_files() + return self._manifest_group_planner.plan_files(manifests=snapshot.manifests(self.io)) + + # TODO: Document motivation and un-caching + @property + def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: + return self._manifest_group_planner.partition_filters A = TypeVar("A", bound="IncrementalScan", covariant=True) @@ -2051,16 +2053,11 @@ def plan_files(self) -> Iterable[FileScanTask]: if manifest_file.content == ManifestContent.DATA and manifest_file.added_snapshot_id in append_snapshot_ids } - return ManifestGroup( + return self._manifest_group_planner.plan_files( manifests=list(manifests), - io=self.io, - table_metadata=self.table_metadata, - parsed_row_filter=self.row_filter, - case_sensitive=self.case_sensitive, - options=self.options, manifest_entry_filter=lambda manifest_entry: manifest_entry.snapshot_id in append_snapshot_ids and manifest_entry.status == ManifestEntryStatus.ADDED, - ).plan_files() + ) def _validate_and_resolve_snapshots(self) -> tuple[int, int]: current_snapshot = self.table_metadata.current_snapshot() @@ -2108,42 +2105,31 @@ def _appends_between( ] -class ManifestGroup: - manifests: List[ManifestFile] +class ManifestGroupPlanner: io: FileIO table_metadata: TableMetadata - parsed_row_filter: BooleanExpression + row_filter: BooleanExpression case_sensitive: bool options: Properties - manifest_entry_filter: Callable[[ManifestEntry], bool] - def __init__( + def __init__(self, scan: AbstractTableScan): + self.io = scan.io + self.table_metadata = scan.table_metadata + self.row_filter = scan.row_filter + self.case_sensitive = scan.case_sensitive + self.options = scan.options + + def plan_files( self, manifests: List[ManifestFile], - io: FileIO, - table_metadata: TableMetadata, - parsed_row_filter: BooleanExpression, - case_sensitive: bool, - options: Properties, manifest_entry_filter: Callable[[ManifestEntry], bool] = lambda _: True, - ): - self.manifests = manifests - self.io = io - self.table_metadata = table_metadata - self.parsed_row_filter = parsed_row_filter - self.case_sensitive = case_sensitive - self.options = options - self.manifest_entry_filter = manifest_entry_filter - - def plan_files(self) -> Iterable[FileScanTask]: + ) -> Iterable[FileScanTask]: # step 1: filter manifests using partition summaries # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) manifests = [ - manifest_file - for manifest_file in self.manifests - if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) + manifest_file for manifest_file in manifests if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) ] residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator) @@ -2174,7 +2160,7 @@ def plan_files(self) -> Iterable[FileScanTask]: ], ) ): - if not self.manifest_entry_filter(manifest_entry): + if not manifest_entry_filter(manifest_entry): continue data_file = manifest_entry.data_file @@ -2201,25 +2187,23 @@ def plan_files(self) -> Iterable[FileScanTask]: for data_entry in data_entries ] - def _build_partition_projection(self, spec_id: int) -> BooleanExpression: - project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive) - return project(self.parsed_row_filter) - - # TODO: Document that this method was removed - # TODO: Or probably: Don't move it and think. We should cache on the scan classes themselves, not here @cached_property - def _partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: + def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: return KeyDefaultDict(self._build_partition_projection) + def _build_partition_projection(self, spec_id: int) -> BooleanExpression: + project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive) + return project(self.row_filter) + def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: spec = self.table_metadata.specs()[spec_id] - return manifest_evaluator(spec, self.table_metadata.schema(), self._partition_filters[spec_id], self.case_sensitive) + return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive) def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: spec = self.table_metadata.specs()[spec_id] partition_type = spec.partition_type(self.table_metadata.schema()) partition_schema = Schema(*partition_type.fields) - partition_expr = self._partition_filters[spec_id] + partition_expr = self.partition_filters[spec_id] # The lambda created here is run in multiple threads. # So we avoid creating _EvaluatorExpression methods bound to a single @@ -2235,7 +2219,7 @@ def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]: # shared instance across multiple threads. return lambda data_file: _InclusiveMetricsEvaluator( schema, - self.parsed_row_filter, + self.row_filter, self.case_sensitive, include_empty_files, ).eval(data_file) @@ -2253,13 +2237,13 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu return lambda datafile: ( residual_evaluator_of( spec=spec, - expr=self.parsed_row_filter, + expr=self.row_filter, case_sensitive=self.case_sensitive, schema=self.table_metadata.schema(), ) ) - # TODO: Document that this method was removed from DataScan and it was made static + # TODO: Document that this method was was made static @staticmethod def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool: """Ensure that no manifests are loaded that contain deletes that are older than the data. From 99021cc7b981a2c85342c8226752a7702135ecc2 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Thu, 22 May 2025 14:43:35 +0100 Subject: [PATCH 08/12] Nit improvements to tests --- dev/provision.py | 2 +- tests/integration/test_reads.py | 2 +- tests/table/test_snapshots.py | 13 ++++++------- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/dev/provision.py b/dev/provision.py index ef58705fa8..92aa4d8256 100644 --- a/dev/provision.py +++ b/dev/provision.py @@ -457,7 +457,7 @@ ) # https://github.com/apache/iceberg/issues/1092#issuecomment-638432848 / https://github.com/apache/iceberg/issues/3747#issuecomment-1145419407 - # REPLACE TABLE requires certain Hive server configuration + # Don't do replace for Hive catalog as REPLACE TABLE requires certain Hive server configuration if catalog_name != "hive": # Replace to break snapshot lineage: spark.sql( diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 429f51b14f..cf751a214a 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -1159,7 +1159,7 @@ def test_incremental_append_scan_throws_on_missing_snapshot_ids(catalog: Catalog test_table.incremental_append_scan( to_snapshot_id_inclusive=test_table.snapshots()[0].snapshot_id, ).plan_files() - assert "Start snapshot of append scan unspecified, please set from_snapshot_id" in str(e.value) + assert "Start snapshot of append scan unspecified, please set from_snapshot_id_exclusive" in str(e.value) # from_snapshot_id missing from metadata with pytest.raises(ValueError) as e: diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index aae31f62f3..343efd9ae3 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -439,21 +439,19 @@ def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None: def test_is_ancestor_of(table_v2: Table) -> None: - snapshot_id = 3055729675574597004 - ancestor_snapshot_id = 3051729675574597004 + snapshot_id, ancestor_snapshot_id = 3055729675574597004, 3051729675574597004 assert is_ancestor_of(snapshot_id, ancestor_snapshot_id, table_v2.metadata) assert not is_ancestor_of(ancestor_snapshot_id, snapshot_id, table_v2.metadata) def test_ancestors_between_ids(table_v2: Table) -> None: - snapshot_id = 3055729675574597004 - ancestor_snapshot_id = 3051729675574597004 + snapshot_id, ancestor_snapshot_id = 3055729675574597004, 3051729675574597004 result = list(ancestors_between_ids(ancestor_snapshot_id, snapshot_id, table_v2.metadata)) ids = [ancestor.snapshot_id for ancestor in result] - # Exclusive-inclusive semantics mean just snapshot_id should be returned + # Exclusive-inclusive semantics means just 'snapshot_id' should be returned assert ids == [snapshot_id] @@ -461,12 +459,13 @@ def test_ancestors_between_equal_ids(table_v2: Table) -> None: snapshot_id = 3055729675574597004 result = list(ancestors_between_ids(snapshot_id, snapshot_id, table_v2.metadata)) + + # Exclusive-inclusive semantics mean no ancestors should be returned assert result == [] def test_ancestors_between_ids_missing_from_snapshot(table_v2: Table) -> None: - snapshot_id = 3055729675574597004 - ancestor_snapshot_id = 3051729675574597004 + snapshot_id, ancestor_snapshot_id = 3055729675574597004, 3051729675574597004 result = list( ancestors_between_ids( From aeca1683639c597b9e6ee500252ae17404c3d5ec Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Thu, 22 May 2025 14:57:41 +0100 Subject: [PATCH 09/12] Test limit on an append scan also --- tests/integration/test_reads.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index cf751a214a..a9ffb1c3ed 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -1103,6 +1103,21 @@ def test_incremental_append_scan_selected_fields(catalog: Catalog) -> None: assert sorted(result_table["number"].to_pylist()) == [2, 3, 4] +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_limit(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan(limit=2) + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) + ) + + # Although three rows were added in the range, the limit of 2 should be applied + assert len(scan.to_arrow()) == 2 + + @pytest.mark.integration @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive")]) def test_incremental_append_scan_to_snapshot_defaults_to_current(catalog: Catalog) -> None: From 6cb35397d8a64379246ffe13b9a996ee710cfea8 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Thu, 22 May 2025 15:26:16 +0100 Subject: [PATCH 10/12] Add a `count` test --- pyiceberg/table/__init__.py | 1 - tests/integration/test_reads.py | 14 ++++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b47307b691..09a6fdb862 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1096,7 +1096,6 @@ def scan( limit=limit, ) - # TODO: Consider more concise name def incremental_append_scan( self, row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index a9ffb1c3ed..e995b6bd48 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -1118,6 +1118,20 @@ def test_incremental_append_scan_limit(catalog: Catalog) -> None: assert len(scan.to_arrow()) == 2 +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_incremental_append_scan_count(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_incremental_read") + + scan = ( + test_table.incremental_append_scan() + .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id) + .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id) + ) + + assert scan.count() == 3 + + @pytest.mark.integration @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive")]) def test_incremental_append_scan_to_snapshot_defaults_to_current(catalog: Catalog) -> None: From f0ee5cb2054433de974d3b54d9345a658ed0d8c9 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Thu, 22 May 2025 15:58:49 +0100 Subject: [PATCH 11/12] Remove TODO --- pyiceberg/table/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 09a6fdb862..585083eb0d 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2242,7 +2242,6 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu ) ) - # TODO: Document that this method was was made static @staticmethod def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool: """Ensure that no manifests are loaded that contain deletes that are older than the data. From 14cbe274f3765bcc34b0c6d2689c90dad22df399 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Thu, 22 May 2025 16:18:55 +0100 Subject: [PATCH 12/12] Remove TODOs --- pyiceberg/table/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 585083eb0d..9a2ee77e86 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1923,7 +1923,6 @@ def plan_files(self) -> Iterable[FileScanTask]: return self._manifest_group_planner.plan_files(manifests=snapshot.manifests(self.io)) - # TODO: Document motivation and un-caching @property def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: return self._manifest_group_planner.partition_filters @@ -2083,7 +2082,6 @@ def _validate_and_resolve_snapshots(self) -> tuple[int, int]: return self.from_snapshot_id_exclusive, to_snapshot_id - # TODO: Note behaviour change from DataScan that we throw def _is_snapshot_missing(self, snapshot_id: int) -> bool: """Return whether the snapshot ID is missing in the table metadata.""" return self.table_metadata.snapshot_by_id(snapshot_id) is None