diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 362e9085df..a92d944811 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -866,6 +866,14 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List if not discard_deleted or entry.status != ManifestEntryStatus.DELETED ] + def __eq__(self, other: Any) -> bool: + """Return the equality of two instances of the ManifestFile class.""" + return self.manifest_path == other.manifest_path if isinstance(other, ManifestFile) else False + + def __hash__(self) -> int: + """Return the hash of manifest_path.""" + return hash(self.manifest_path) + @cached(cache=LRUCache(maxsize=128), key=lambda io, manifest_list: hashkey(manifest_list)) def _manifests(io: FileIO, manifest_list: str) -> Tuple[ManifestFile, ...]: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 6c6da2a9b7..e2919cb078 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -69,6 +69,7 @@ DataFileContent, ManifestContent, ManifestEntry, + ManifestEntryStatus, ManifestFile, ) from pyiceberg.partitioning import ( @@ -89,8 +90,11 @@ ) from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef from pyiceberg.table.snapshots import ( + Operation, Snapshot, SnapshotLogEntry, + ancestors_between_ids, + is_ancestor_of, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.table.update import ( @@ -1139,6 +1143,60 @@ def scan( limit=limit, ) + def incremental_append_scan( + self, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + from_snapshot_id_exclusive: Optional[int] = None, + to_snapshot_id_inclusive: Optional[int] = None, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + ) -> IncrementalAppendScan: + """Fetch an IncrementalAppendScan based on the table's current metadata. + + The incremental append scan can be used to project the table's data + from append snapshots within a snapshot range and that matches the + provided row_filter onto the table's current schema + + Args: + row_filter: + A string or BooleanExpression that describes the + desired rows + selected_fields: + A tuple of strings representing the column names + to return in the output dataframe. + case_sensitive: + If True column matching is case sensitive + from_snapshot_id_exclusive: + Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. This can be set + on the IncrementalAppendScan object returned, but ultimately must not be None. + to_snapshot_id_inclusive: + Optional ID of the "to" snapshot, to end the incremental scan at, inclusively. This can be set on the + IncrementalAppendScan object returned. Ultimately, it will default to the table's current snapshot. + options: + Additional Table properties as a dictionary of + string key value pairs to use for this scan. + limit: + An integer representing the number of rows to + return in the scan result. If None, fetches all + matching rows. + + Returns: + An IncrementalAppendScan based on the table's current metadata and provided parameters. + """ + return IncrementalAppendScan( + table_metadata=self.metadata, + io=self.io, + row_filter=row_filter, + selected_fields=selected_fields, + case_sensitive=case_sensitive, + from_snapshot_id_exclusive=from_snapshot_id_exclusive, + to_snapshot_id_inclusive=to_snapshot_id_inclusive, + options=options, + limit=limit, + ) + @property def format_version(self) -> TableVersion: return self.metadata.format_version @@ -1630,16 +1688,17 @@ def _parse_row_filter(expr: Union[str, BooleanExpression]) -> BooleanExpression: return parser.parse(expr) if isinstance(expr, str) else expr -S = TypeVar("S", bound="TableScan", covariant=True) +A = TypeVar("A", bound="AbstractTableScan", covariant=True) + +class AbstractTableScan(ABC): + """A base class for all table scans.""" -class TableScan(ABC): table_metadata: TableMetadata io: FileIO row_filter: BooleanExpression selected_fields: Tuple[str, ...] case_sensitive: bool - snapshot_id: Optional[int] options: Properties limit: Optional[int] @@ -1650,7 +1709,6 @@ def __init__( row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, selected_fields: Tuple[str, ...] = ("*",), case_sensitive: bool = True, - snapshot_id: Optional[int] = None, options: Properties = EMPTY_DICT, limit: Optional[int] = None, ): @@ -1659,10 +1717,108 @@ def __init__( self.row_filter = _parse_row_filter(row_filter) self.selected_fields = selected_fields self.case_sensitive = case_sensitive - self.snapshot_id = snapshot_id self.options = options self.limit = limit + @abstractmethod + def projection(self) -> Schema: ... + + @abstractmethod + def plan_files(self) -> Iterable[ScanTask]: ... + + @abstractmethod + def to_arrow(self) -> pa.Table: ... + + def select(self: A, *field_names: str) -> A: + if "*" in self.selected_fields: + return self.update(selected_fields=field_names) + return self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names)))) + + def filter(self: A, expr: Union[str, BooleanExpression]) -> A: + return self.update(row_filter=And(self.row_filter, _parse_row_filter(expr))) + + def with_case_sensitive(self: A, case_sensitive: bool = True) -> A: + return self.update(case_sensitive=case_sensitive) + + def update(self: A, **overrides: Any) -> A: + """Create a copy of this table scan with updated fields.""" + from inspect import signature + + # Extract those attributes that are constructor parameters. We don't use self.__dict__ as the kwargs to the + # constructors because it may contain additional attributes that are not part of the constructor signature. + params = signature(type(self).__init__).parameters.keys() - {"self"} # Skip "self" parameter + kwargs = {param: getattr(self, param) for param in params} # Assume parameters are attributes + + return type(self)(**{**kwargs, **overrides}) + + def to_pandas(self, **kwargs: Any) -> pd.DataFrame: + """Read a Pandas DataFrame eagerly from this Iceberg table scan. + + Returns: + pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table scan + """ + return self.to_arrow().to_pandas(**kwargs) + + def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: + """Shorthand for loading this table scan in DuckDB. + + Returns: + DuckDBPyConnection: In memory DuckDB connection with the Iceberg table scan. + """ + import duckdb + + con = connection or duckdb.connect(database=":memory:") + con.register(table_name, self.to_arrow()) + + return con + + def to_ray(self) -> ray.data.dataset.Dataset: + """Read a Ray Dataset eagerly from this Iceberg table scan. + + Returns: + ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table scan + """ + import ray + + return ray.data.from_arrow(self.to_arrow()) + + def to_polars(self) -> pl.DataFrame: + """Read a Polars DataFrame from this Iceberg table scan. + + Returns: + pl.DataFrame: Materialized Polars Dataframe from the Iceberg table scan + """ + import polars as pl + + result = pl.from_arrow(self.to_arrow()) + if isinstance(result, pl.Series): + result = result.to_frame() + + return result + + +S = TypeVar("S", bound="TableScan", covariant=True) + + +class TableScan(AbstractTableScan, ABC): + """A base class for table scans targeting a single snapshot.""" + + snapshot_id: Optional[int] + + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + snapshot_id: Optional[int] = None, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + ): + super().__init__(table_metadata, io, row_filter, selected_fields, case_sensitive, options, limit) + self.snapshot_id = snapshot_id + def snapshot(self) -> Optional[Snapshot]: if self.snapshot_id: return self.table_metadata.snapshot_by_id(self.snapshot_id) @@ -1688,29 +1844,6 @@ def projection(self) -> Schema: return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) - @abstractmethod - def plan_files(self) -> Iterable[ScanTask]: ... - - @abstractmethod - def to_arrow(self) -> pa.Table: ... - - @abstractmethod - def to_pandas(self, **kwargs: Any) -> pd.DataFrame: ... - - @abstractmethod - def to_polars(self) -> pl.DataFrame: ... - - def update(self: S, **overrides: Any) -> S: - """Create a copy of this table scan with updated fields.""" - from inspect import signature - - # Extract those attributes that are constructor parameters. We don't use self.__dict__ as the kwargs to the - # constructors because it may contain additional attributes that are not part of the constructor signature. - params = signature(type(self).__init__).parameters.keys() - {"self"} # Skip "self" parameter - kwargs = {param: getattr(self, param) for param in params} # Assume parameters are attributes - - return type(self)(**{**kwargs, **overrides}) - def use_ref(self: S, name: str) -> S: if self.snapshot_id: raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") @@ -1719,17 +1852,6 @@ def use_ref(self: S, name: str) -> S: raise ValueError(f"Cannot scan unknown ref={name}") - def select(self: S, *field_names: str) -> S: - if "*" in self.selected_fields: - return self.update(selected_fields=field_names) - return self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names)))) - - def filter(self: S, expr: Union[str, BooleanExpression]) -> S: - return self.update(row_filter=And(self.row_filter, _parse_row_filter(expr))) - - def with_case_sensitive(self: S, case_sensitive: bool = True) -> S: - return self.update(case_sensitive=case_sensitive) - @abstractmethod def count(self) -> int: ... @@ -1819,76 +1941,19 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent class DataScan(TableScan): - def _build_partition_projection(self, spec_id: int) -> BooleanExpression: - project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive) - return project(self.row_filter) - @cached_property - def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: - return KeyDefaultDict(self._build_partition_projection) - - def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: - spec = self.table_metadata.specs()[spec_id] - return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive) - - def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: - spec = self.table_metadata.specs()[spec_id] - partition_type = spec.partition_type(self.table_metadata.schema()) - partition_schema = Schema(*partition_type.fields) - partition_expr = self.partition_filters[spec_id] - - # The lambda created here is run in multiple threads. - # So we avoid creating _EvaluatorExpression methods bound to a single - # shared instance across multiple threads. - return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition) - - def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]: - schema = self.table_metadata.schema() - include_empty_files = strtobool(self.options.get("include_empty_files", "false")) - - # The lambda created here is run in multiple threads. - # So we avoid creating _InclusiveMetricsEvaluator methods bound to a single - # shared instance across multiple threads. - return lambda data_file: _InclusiveMetricsEvaluator( - schema, - self.row_filter, - self.case_sensitive, - include_empty_files, - ).eval(data_file) - - def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]: - spec = self.table_metadata.specs()[spec_id] - - from pyiceberg.expressions.visitors import residual_evaluator_of - - # The lambda created here is run in multiple threads. - # So we avoid creating _EvaluatorExpression methods bound to a single - # shared instance across multiple threads. - return lambda datafile: ( - residual_evaluator_of( - spec=spec, - expr=self.row_filter, - case_sensitive=self.case_sensitive, - schema=self.table_metadata.schema(), - ) + def _manifest_planner(self) -> ManifestGroupPlanner: + return ManifestGroupPlanner( + table_metadata=self.table_metadata, + io=self.io, + row_filter=self.row_filter, + case_sensitive=self.case_sensitive, + options=self.options, ) - @staticmethod - def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool: - """Ensure that no manifests are loaded that contain deletes that are older than the data. - - Args: - min_sequence_number (int): The minimal sequence number. - manifest (ManifestFile): A ManifestFile that can be either data or deletes. - - Returns: - Boolean indicating if it is either a data file, or a relevant delete file. - """ - return manifest.content == ManifestContent.DATA or ( - # Not interested in deletes that are older than the data - manifest.content == ManifestContent.DELETES - and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number - ) + @property + def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: + return self._manifest_planner.partition_filters def plan_files(self) -> Iterable[FileScanTask]: """Plans the relevant files by filtering on the PartitionSpecs. @@ -1900,76 +1965,194 @@ def plan_files(self) -> Iterable[FileScanTask]: if not snapshot: return iter([]) - # step 1: filter manifests using partition summaries - # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id - - manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) + return self._manifest_planner.plan_files(manifests=snapshot.manifests(self.io)) - residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator) + def to_arrow(self) -> pa.Table: + """Read an Arrow table eagerly from this DataScan. - manifests = [ - manifest_file - for manifest_file in snapshot.manifests(self.io) - if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) - ] + All rows will be loaded into memory at once. - # step 2: filter the data files in each manifest - # this filter depends on the partition spec used to write the manifest file + Returns: + pa.Table: Materialized Arrow Table from the Iceberg table's DataScan + """ + from pyiceberg.io.pyarrow import ArrowScan - partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator) + return ArrowScan( + self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit + ).to_table(self.plan_files()) - min_sequence_number = _min_sequence_number(manifests) + def to_arrow_batch_reader(self) -> pa.RecordBatchReader: + """Return an Arrow RecordBatchReader from this DataScan. - data_entries: List[ManifestEntry] = [] - positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER) + For large results, using a RecordBatchReader requires less memory than + loading an Arrow Table for the same DataScan, because a RecordBatch + is read one at a time. - executor = ExecutorFactory.get_or_create() - for manifest_entry in chain( - *executor.map( - lambda args: _open_manifest(*args), - [ - ( - self.io, - manifest, - partition_evaluators[manifest.partition_spec_id], - self._build_metrics_evaluator(), - ) - for manifest in manifests - if self._check_sequence_number(min_sequence_number, manifest) - ], - ) - ): - data_file = manifest_entry.data_file - if data_file.content == DataFileContent.DATA: - data_entries.append(manifest_entry) - elif data_file.content == DataFileContent.POSITION_DELETES: - positional_delete_entries.add(manifest_entry) - elif data_file.content == DataFileContent.EQUALITY_DELETES: - raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568") + Returns: + pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan + which can be used to read a stream of record batches one by one. + """ + import pyarrow as pa + + from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow + + target_schema = schema_to_pyarrow(self.projection()) + batches = ArrowScan( + self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit + ).to_record_batches(self.plan_files()) + + return pa.RecordBatchReader.from_batches( + target_schema, + batches, + ).cast(target_schema) + + def count(self) -> int: + from pyiceberg.io.pyarrow import ArrowScan + + # Usage: Calculates the total number of records in a Scan that haven't had positional deletes. + res = 0 + # every task is a FileScanTask + tasks = self.plan_files() + + for task in tasks: + # task.residual is a Boolean Expression if the filter condition is fully satisfied by the + # partition value and task.delete_files represents that positional delete haven't been merged yet + # hence those files have to read as a pyarrow table applying the filter and deletes + if task.residual == AlwaysTrue() and len(task.delete_files) == 0: + # Every File has a metadata stat that stores the file record count + res += task.file.record_count else: - raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}") + arrow_scan = ArrowScan( + table_metadata=self.table_metadata, + io=self.io, + projected_schema=self.projection(), + row_filter=self.row_filter, + case_sensitive=self.case_sensitive, + ) + tbl = arrow_scan.to_table([task]) + res += len(tbl) + return res - return [ - FileScanTask( - data_entry.data_file, - delete_files=_match_deletes_to_data_file( - data_entry, - positional_delete_entries, - ), - residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for( - data_entry.data_file.partition - ), - ) - for data_entry in data_entries - ] + +class IncrementalAppendScan(AbstractTableScan): + """An incremental scan of a table's data that accumulates appended data between two snapshots. + + Args: + row_filter: + A string or BooleanExpression that describes the + desired rows + selected_fields: + A tuple of strings representing the column names + to return in the output dataframe. + case_sensitive: + If True column matching is case sensitive + options: + Additional Table properties as a dictionary of + string key value pairs to use for this scan. + limit: + An integer representing the number of rows to + return in the scan result. If None, fetches all + matching rows. + from_snapshot_id_exclusive: + Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. When the scan is + ultimately planned, this must not be None. + to_snapshot_id_inclusive: + Optional ID of the "to" snapshot, to end the incremental scan at, inclusively. + Omitting it will default to the table's current snapshot. + """ + + from_snapshot_id_exclusive: Optional[int] + to_snapshot_id_inclusive: Optional[int] + + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + from_snapshot_id_exclusive: Optional[int] = None, + to_snapshot_id_inclusive: Optional[int] = None, + ): + super().__init__( + table_metadata, + io, + row_filter, + selected_fields, + case_sensitive, + options, + limit, + ) + self.from_snapshot_id_exclusive = from_snapshot_id_exclusive + self.to_snapshot_id_inclusive = to_snapshot_id_inclusive + + def from_snapshot_exclusive(self: A, from_snapshot_id_exclusive: Optional[int]) -> A: + """Instructs this scan to look for changes starting from a particular snapshot (exclusive). + + Args: + from_snapshot_id_exclusive: the start snapshot ID (exclusive) + + Returns: + this for method chaining + """ + return self.update(from_snapshot_id_exclusive=from_snapshot_id_exclusive) + + def to_snapshot_inclusive(self: A, to_snapshot_id_inclusive: Optional[int]) -> A: + """Instructs this scan to look for changes up to a particular snapshot (inclusive). + + Args: + to_snapshot_id_inclusive: the end snapshot ID (inclusive) + + Returns: + this for method chaining + """ + return self.update(to_snapshot_id_inclusive=to_snapshot_id_inclusive) + + def projection(self) -> Schema: + current_schema = self.table_metadata.schema() + + if "*" in self.selected_fields: + return current_schema + + return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) + + def plan_files(self) -> Iterable[FileScanTask]: + from_snapshot_id, to_snapshot_id = self._validate_and_resolve_snapshots() + + append_snapshots: List[Snapshot] = self._appends_between(from_snapshot_id, to_snapshot_id, self.table_metadata) + if len(append_snapshots) == 0: + return iter([]) + + append_snapshot_ids: Set[int] = {snapshot.snapshot_id for snapshot in append_snapshots} + + manifests = { + manifest_file + for snapshot in append_snapshots + for manifest_file in snapshot.manifests(self.io) + if manifest_file.content == ManifestContent.DATA and manifest_file.added_snapshot_id in append_snapshot_ids + } + + return ManifestGroupPlanner( + table_metadata=self.table_metadata, + io=self.io, + row_filter=self.row_filter, + case_sensitive=self.case_sensitive, + options=self.options, + ).plan_files( + manifests=list(manifests), + manifest_entry_filter=lambda manifest_entry: manifest_entry.snapshot_id in append_snapshot_ids + and manifest_entry.status == ManifestEntryStatus.ADDED, + ) def to_arrow(self) -> pa.Table: - """Read an Arrow table eagerly from this DataScan. + """Read an Arrow table eagerly from this IncrementalAppendScan. All rows will be loaded into memory at once. Returns: - pa.Table: Materialized Arrow Table from the Iceberg table's DataScan + pa.Table: Materialized Arrow Table from the Iceberg table's IncrementalAppendScan """ from pyiceberg.io.pyarrow import ArrowScan @@ -1978,14 +2161,14 @@ def to_arrow(self) -> pa.Table: ).to_table(self.plan_files()) def to_arrow_batch_reader(self) -> pa.RecordBatchReader: - """Return an Arrow RecordBatchReader from this DataScan. + """Return an Arrow RecordBatchReader from this IncrementalAppendScan. For large results, using a RecordBatchReader requires less memory than - loading an Arrow Table for the same DataScan, because a RecordBatch + loading an Arrow Table for the same IncrementalAppendScan, because a RecordBatch is read one at a time. Returns: - pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan + pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's IncrementalAppendScan which can be used to read a stream of record batches one by one. """ import pyarrow as pa @@ -2002,77 +2185,210 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: batches, ).cast(target_schema) - def to_pandas(self, **kwargs: Any) -> pd.DataFrame: - """Read a Pandas DataFrame eagerly from this Iceberg table. + def _validate_and_resolve_snapshots(self) -> tuple[int, int]: + current_snapshot = self.table_metadata.current_snapshot() + to_snapshot_id = self.to_snapshot_id_inclusive - Returns: - pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table - """ - return self.to_arrow().to_pandas(**kwargs) + if self.from_snapshot_id_exclusive is None: + raise ValueError("Start snapshot of append scan unspecified, please set from_snapshot_id_exclusive") - def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: - """Shorthand for loading the Iceberg Table in DuckDB. + if to_snapshot_id is None: + if current_snapshot is None: + raise ValueError("End snapshot of append scan is not set and table has no current snapshot") - Returns: - DuckDBPyConnection: In memory DuckDB connection with the Iceberg table. - """ - import duckdb + to_snapshot_id = current_snapshot.snapshot_id + elif self._is_snapshot_missing(to_snapshot_id): + raise ValueError(f"End snapshot of append scan not found on table metadata: {to_snapshot_id}") - con = connection or duckdb.connect(database=":memory:") - con.register(table_name, self.to_arrow()) + if self._is_snapshot_missing(self.from_snapshot_id_exclusive): + raise ValueError(f"Start snapshot of append scan not found on table metadata: {self.from_snapshot_id_exclusive}") - return con + if not is_ancestor_of(to_snapshot_id, self.from_snapshot_id_exclusive, self.table_metadata): + raise ValueError( + f"Append scan's start snapshot {self.from_snapshot_id_exclusive} is not an ancestor of end snapshot {to_snapshot_id}" + ) - def to_ray(self) -> ray.data.dataset.Dataset: - """Read a Ray Dataset eagerly from this Iceberg table. + return self.from_snapshot_id_exclusive, to_snapshot_id - Returns: - ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table - """ - import ray + def _is_snapshot_missing(self, snapshot_id: int) -> bool: + """Return whether the snapshot ID is missing in the table metadata.""" + return self.table_metadata.snapshot_by_id(snapshot_id) is None - return ray.data.from_arrow(self.to_arrow()) + @staticmethod + def _appends_between( + from_snapshot_id_exclusive: int, to_snapshot_id_inclusive: int, table_metadata: TableMetadata + ) -> List[Snapshot]: + """Return the list of snapshots that are appends between two snapshot IDs.""" + return [ + snapshot + for snapshot in ancestors_between_ids( + from_snapshot_id_exclusive=from_snapshot_id_exclusive, + to_snapshot_id_inclusive=to_snapshot_id_inclusive, + table_metadata=table_metadata, + ) + if snapshot.summary is not None and snapshot.summary.operation == Operation.APPEND + ] - def to_polars(self) -> pl.DataFrame: - """Read a Polars DataFrame from this Iceberg table. - Returns: - pl.DataFrame: Materialized Polars Dataframe from the Iceberg table - """ - import polars as pl +class ManifestGroupPlanner: + io: FileIO + table_metadata: TableMetadata + row_filter: BooleanExpression + case_sensitive: bool + options: Properties - result = pl.from_arrow(self.to_arrow()) - if isinstance(result, pl.Series): - result = result.to_frame() + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + case_sensitive: bool = True, + options: Properties = EMPTY_DICT, + ): + self.table_metadata = table_metadata + self.io = io + self.row_filter = _parse_row_filter(row_filter) + self.case_sensitive = case_sensitive + self.options = options - return result + def plan_files( + self, + manifests: List[ManifestFile], + manifest_entry_filter: Callable[[ManifestEntry], bool] = lambda _: True, + ) -> Iterable[FileScanTask]: + # step 1: filter manifests using partition summaries + # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id - def count(self) -> int: - from pyiceberg.io.pyarrow import ArrowScan + manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) + manifests = [ + manifest_file for manifest_file in manifests if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) + ] - # Usage: Calculates the total number of records in a Scan that haven't had positional deletes. - res = 0 - # every task is a FileScanTask - tasks = self.plan_files() + residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator) - for task in tasks: - # task.residual is a Boolean Expression if the filter condition is fully satisfied by the - # partition value and task.delete_files represents that positional delete haven't been merged yet - # hence those files have to read as a pyarrow table applying the filter and deletes - if task.residual == AlwaysTrue() and len(task.delete_files) == 0: - # Every File has a metadata stat that stores the file record count - res += task.file.record_count + # step 2: filter the data files in each manifest + # this filter depends on the partition spec used to write the manifest file + + partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator) + + min_sequence_number = _min_sequence_number(manifests) + + data_entries: List[ManifestEntry] = [] + positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER) + + executor = ExecutorFactory.get_or_create() + for manifest_entry in chain( + *executor.map( + lambda args: _open_manifest(*args), + [ + ( + self.io, + manifest, + partition_evaluators[manifest.partition_spec_id], + self._build_metrics_evaluator(), + ) + for manifest in manifests + if self._check_sequence_number(min_sequence_number, manifest) + ], + ) + ): + if not manifest_entry_filter(manifest_entry): + continue + + data_file = manifest_entry.data_file + if data_file.content == DataFileContent.DATA: + data_entries.append(manifest_entry) + elif data_file.content == DataFileContent.POSITION_DELETES: + positional_delete_entries.add(manifest_entry) + elif data_file.content == DataFileContent.EQUALITY_DELETES: + raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568") else: - arrow_scan = ArrowScan( - table_metadata=self.table_metadata, - io=self.io, - projected_schema=self.projection(), - row_filter=self.row_filter, - case_sensitive=self.case_sensitive, - ) - tbl = arrow_scan.to_table([task]) - res += len(tbl) - return res + raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}") + + return [ + FileScanTask( + data_entry.data_file, + delete_files=_match_deletes_to_data_file( + data_entry, + positional_delete_entries, + ), + residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for( + data_entry.data_file.partition + ), + ) + for data_entry in data_entries + ] + + @cached_property + def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: + return KeyDefaultDict(self._build_partition_projection) + + def _build_partition_projection(self, spec_id: int) -> BooleanExpression: + project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive) + return project(self.row_filter) + + def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: + spec = self.table_metadata.specs()[spec_id] + return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive) + + def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: + spec = self.table_metadata.specs()[spec_id] + partition_type = spec.partition_type(self.table_metadata.schema()) + partition_schema = Schema(*partition_type.fields) + partition_expr = self.partition_filters[spec_id] + + # The lambda created here is run in multiple threads. + # So we avoid creating _EvaluatorExpression methods bound to a single + # shared instance across multiple threads. + return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition) + + def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]: + schema = self.table_metadata.schema() + include_empty_files = strtobool(self.options.get("include_empty_files", "false")) + + # The lambda created here is run in multiple threads. + # So we avoid creating _InclusiveMetricsEvaluator methods bound to a single + # shared instance across multiple threads. + return lambda data_file: _InclusiveMetricsEvaluator( + schema, + self.row_filter, + self.case_sensitive, + include_empty_files, + ).eval(data_file) + + def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]: + spec = self.table_metadata.specs()[spec_id] + + from pyiceberg.expressions.visitors import residual_evaluator_of + + # The lambda created here is run in multiple threads. + # So we avoid creating _EvaluatorExpression methods bound to a single + # shared instance across multiple threads. + return lambda datafile: ( + residual_evaluator_of( + spec=spec, + expr=self.row_filter, + case_sensitive=self.case_sensitive, + schema=self.table_metadata.schema(), + ) + ) + + @staticmethod + def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool: + """Ensure that no manifests are loaded that contain deletes that are older than the data. + + Args: + min_sequence_number (int): The minimal sequence number. + manifest (ManifestFile): A ManifestFile that can be either data or deletes. + + Returns: + Boolean indicating if it is either a data file, or a relevant delete file. + """ + return manifest.content == ManifestContent.DATA or ( + # Not interested in deletes that are older than the data + manifest.content == ManifestContent.DELETES + and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number + ) @dataclass(frozen=True) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 60ad7219e1..dac504f2a5 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -451,3 +451,32 @@ def ancestors_between( break else: yield from ancestors_of(to_snapshot, table_metadata) + + +def ancestors_between_ids( + from_snapshot_id_exclusive: Optional[int], + to_snapshot_id_inclusive: int, + table_metadata: TableMetadata, +) -> Iterable[Snapshot]: + """Return the ancestors of and including the given "to" snapshot, up to but not including the "from" snapshot. + + If from_snapshot_id_exclusive is None or no ancestors of the "to" snapshot match it, all ancestors of the "to" + snapshot are returned. + """ + if from_snapshot_id_exclusive is not None: + for snapshot in ancestors_of(table_metadata.snapshot_by_id(to_snapshot_id_inclusive), table_metadata): + if snapshot.snapshot_id == from_snapshot_id_exclusive: + break + + yield snapshot + else: + yield from ancestors_of(table_metadata.snapshot_by_id(to_snapshot_id_inclusive), table_metadata) + + +def is_ancestor_of(snapshot_id: int, ancestor_snapshot_id: int, table_metadata: TableMetadata) -> bool: + """Return whether ancestor_snapshot_id is an ancestor of snapshot_id.""" + for snapshot in ancestors_of(table_metadata.snapshot_by_id(snapshot_id), table_metadata): + if snapshot.snapshot_id == ancestor_snapshot_id: + return True + + return False diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index d26562ad8f..6244fb1981 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -29,7 +29,9 @@ SnapshotSummaryCollector, Summary, ancestors_between, + ancestors_between_ids, ancestors_of, + is_ancestor_of, update_snapshot_summaries, ) from pyiceberg.transforms import IdentityTransform @@ -456,3 +458,43 @@ def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None: ) == 2000 ) + + +def test_is_ancestor_of(table_v2: Table) -> None: + snapshot_id, ancestor_snapshot_id = 3055729675574597004, 3051729675574597004 + + assert is_ancestor_of(snapshot_id, ancestor_snapshot_id, table_v2.metadata) + assert not is_ancestor_of(ancestor_snapshot_id, snapshot_id, table_v2.metadata) + + +def test_ancestors_between_ids(table_v2: Table) -> None: + snapshot_id, ancestor_snapshot_id = 3055729675574597004, 3051729675574597004 + + result = list(ancestors_between_ids(ancestor_snapshot_id, snapshot_id, table_v2.metadata)) + ids = [ancestor.snapshot_id for ancestor in result] + + # Exclusive-inclusive semantics means just 'snapshot_id' should be returned + assert ids == [snapshot_id] + + +def test_ancestors_between_equal_ids(table_v2: Table) -> None: + snapshot_id = 3055729675574597004 + + result = list(ancestors_between_ids(snapshot_id, snapshot_id, table_v2.metadata)) + + # Exclusive-inclusive semantics mean no ancestors should be returned + assert result == [] + + +def test_ancestors_between_ids_missing_from_snapshot(table_v2: Table) -> None: + snapshot_id, ancestor_snapshot_id = 3055729675574597004, 3051729675574597004 + + result = list( + ancestors_between_ids( + from_snapshot_id_exclusive=None, to_snapshot_id_inclusive=snapshot_id, table_metadata=table_v2.metadata + ) + ) + ids = [ancestor.snapshot_id for ancestor in result] + + # With a from snapshot missing, all ancestors should be returned + assert ids == [snapshot_id, ancestor_snapshot_id]