diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 6c6da2a9b7..fd00dc4d52 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1819,76 +1819,19 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent class DataScan(TableScan): - def _build_partition_projection(self, spec_id: int) -> BooleanExpression: - project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive) - return project(self.row_filter) - @cached_property - def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: - return KeyDefaultDict(self._build_partition_projection) - - def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: - spec = self.table_metadata.specs()[spec_id] - return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive) - - def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: - spec = self.table_metadata.specs()[spec_id] - partition_type = spec.partition_type(self.table_metadata.schema()) - partition_schema = Schema(*partition_type.fields) - partition_expr = self.partition_filters[spec_id] - - # The lambda created here is run in multiple threads. - # So we avoid creating _EvaluatorExpression methods bound to a single - # shared instance across multiple threads. - return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition) - - def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]: - schema = self.table_metadata.schema() - include_empty_files = strtobool(self.options.get("include_empty_files", "false")) - - # The lambda created here is run in multiple threads. - # So we avoid creating _InclusiveMetricsEvaluator methods bound to a single - # shared instance across multiple threads. - return lambda data_file: _InclusiveMetricsEvaluator( - schema, - self.row_filter, - self.case_sensitive, - include_empty_files, - ).eval(data_file) - - def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]: - spec = self.table_metadata.specs()[spec_id] - - from pyiceberg.expressions.visitors import residual_evaluator_of - - # The lambda created here is run in multiple threads. - # So we avoid creating _EvaluatorExpression methods bound to a single - # shared instance across multiple threads. - return lambda datafile: ( - residual_evaluator_of( - spec=spec, - expr=self.row_filter, - case_sensitive=self.case_sensitive, - schema=self.table_metadata.schema(), - ) + def _manifest_planner(self) -> ManifestGroupPlanner: + return ManifestGroupPlanner( + table_metadata=self.table_metadata, + io=self.io, + row_filter=self.row_filter, + case_sensitive=self.case_sensitive, + options=self.options, ) - @staticmethod - def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool: - """Ensure that no manifests are loaded that contain deletes that are older than the data. - - Args: - min_sequence_number (int): The minimal sequence number. - manifest (ManifestFile): A ManifestFile that can be either data or deletes. - - Returns: - Boolean indicating if it is either a data file, or a relevant delete file. - """ - return manifest.content == ManifestContent.DATA or ( - # Not interested in deletes that are older than the data - manifest.content == ManifestContent.DELETES - and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number - ) + @property + def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: + return self._manifest_planner.partition_filters def plan_files(self) -> Iterable[FileScanTask]: """Plans the relevant files by filtering on the PartitionSpecs. @@ -1900,68 +1843,7 @@ def plan_files(self) -> Iterable[FileScanTask]: if not snapshot: return iter([]) - # step 1: filter manifests using partition summaries - # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id - - manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) - - residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator) - - manifests = [ - manifest_file - for manifest_file in snapshot.manifests(self.io) - if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) - ] - - # step 2: filter the data files in each manifest - # this filter depends on the partition spec used to write the manifest file - - partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator) - - min_sequence_number = _min_sequence_number(manifests) - - data_entries: List[ManifestEntry] = [] - positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER) - - executor = ExecutorFactory.get_or_create() - for manifest_entry in chain( - *executor.map( - lambda args: _open_manifest(*args), - [ - ( - self.io, - manifest, - partition_evaluators[manifest.partition_spec_id], - self._build_metrics_evaluator(), - ) - for manifest in manifests - if self._check_sequence_number(min_sequence_number, manifest) - ], - ) - ): - data_file = manifest_entry.data_file - if data_file.content == DataFileContent.DATA: - data_entries.append(manifest_entry) - elif data_file.content == DataFileContent.POSITION_DELETES: - positional_delete_entries.add(manifest_entry) - elif data_file.content == DataFileContent.EQUALITY_DELETES: - raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568") - else: - raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}") - - return [ - FileScanTask( - data_entry.data_file, - delete_files=_match_deletes_to_data_file( - data_entry, - positional_delete_entries, - ), - residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for( - data_entry.data_file.partition - ), - ) - for data_entry in data_entries - ] + return self._manifest_planner.plan_files(manifests=snapshot.manifests(self.io)) def to_arrow(self) -> pa.Table: """Read an Arrow table eagerly from this DataScan. @@ -2075,6 +1957,160 @@ def count(self) -> int: return res +class ManifestGroupPlanner: + io: FileIO + table_metadata: TableMetadata + row_filter: BooleanExpression + case_sensitive: bool + options: Properties + + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + case_sensitive: bool = True, + options: Properties = EMPTY_DICT, + ): + self.table_metadata = table_metadata + self.io = io + self.row_filter = _parse_row_filter(row_filter) + self.case_sensitive = case_sensitive + self.options = options + + def plan_files(self, manifests: List[ManifestFile]) -> Iterable[FileScanTask]: + # step 1: filter manifests using partition summaries + # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id + + manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) + manifests = [ + manifest_file for manifest_file in manifests if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) + ] + + residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator) + + # step 2: filter the data files in each manifest + # this filter depends on the partition spec used to write the manifest file + + partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator) + + min_sequence_number = _min_sequence_number(manifests) + + data_entries: List[ManifestEntry] = [] + positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER) + + executor = ExecutorFactory.get_or_create() + for manifest_entry in chain( + *executor.map( + lambda args: _open_manifest(*args), + [ + ( + self.io, + manifest, + partition_evaluators[manifest.partition_spec_id], + self._build_metrics_evaluator(), + ) + for manifest in manifests + if self._check_sequence_number(min_sequence_number, manifest) + ], + ) + ): + data_file = manifest_entry.data_file + if data_file.content == DataFileContent.DATA: + data_entries.append(manifest_entry) + elif data_file.content == DataFileContent.POSITION_DELETES: + positional_delete_entries.add(manifest_entry) + elif data_file.content == DataFileContent.EQUALITY_DELETES: + raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568") + else: + raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}") + + return [ + FileScanTask( + data_entry.data_file, + delete_files=_match_deletes_to_data_file( + data_entry, + positional_delete_entries, + ), + residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for( + data_entry.data_file.partition + ), + ) + for data_entry in data_entries + ] + + @cached_property + def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: + return KeyDefaultDict(self._build_partition_projection) + + def _build_partition_projection(self, spec_id: int) -> BooleanExpression: + project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive) + return project(self.row_filter) + + def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: + spec = self.table_metadata.specs()[spec_id] + return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive) + + def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: + spec = self.table_metadata.specs()[spec_id] + partition_type = spec.partition_type(self.table_metadata.schema()) + partition_schema = Schema(*partition_type.fields) + partition_expr = self.partition_filters[spec_id] + + # The lambda created here is run in multiple threads. + # So we avoid creating _EvaluatorExpression methods bound to a single + # shared instance across multiple threads. + return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition) + + def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]: + schema = self.table_metadata.schema() + include_empty_files = strtobool(self.options.get("include_empty_files", "false")) + + # The lambda created here is run in multiple threads. + # So we avoid creating _InclusiveMetricsEvaluator methods bound to a single + # shared instance across multiple threads. + return lambda data_file: _InclusiveMetricsEvaluator( + schema, + self.row_filter, + self.case_sensitive, + include_empty_files, + ).eval(data_file) + + def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]: + spec = self.table_metadata.specs()[spec_id] + + from pyiceberg.expressions.visitors import residual_evaluator_of + + # The lambda created here is run in multiple threads. + # So we avoid creating _EvaluatorExpression methods bound to a single + # shared instance across multiple threads. + return lambda datafile: ( + residual_evaluator_of( + spec=spec, + expr=self.row_filter, + case_sensitive=self.case_sensitive, + schema=self.table_metadata.schema(), + ) + ) + + @staticmethod + def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool: + """Ensure that no manifests are loaded that contain deletes that are older than the data. + + Args: + min_sequence_number (int): The minimal sequence number. + manifest (ManifestFile): A ManifestFile that can be either data or deletes. + + Returns: + Boolean indicating if it is either a data file, or a relevant delete file. + """ + return manifest.content == ManifestContent.DATA or ( + # Not interested in deletes that are older than the data + manifest.content == ManifestContent.DELETES + and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number + ) + + @dataclass(frozen=True) class WriteTask: """Task with the parameters for writing a DataFile."""