-
Notifications
You must be signed in to change notification settings - Fork 343
[Append Scan] Extract manifest group planning into separate class #2232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1819,76 +1819,19 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent | |
|
||
|
||
class DataScan(TableScan): | ||
def _build_partition_projection(self, spec_id: int) -> BooleanExpression: | ||
project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive) | ||
return project(self.row_filter) | ||
|
||
@cached_property | ||
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: | ||
return KeyDefaultDict(self._build_partition_projection) | ||
|
||
def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: | ||
spec = self.table_metadata.specs()[spec_id] | ||
return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive) | ||
|
||
def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: | ||
spec = self.table_metadata.specs()[spec_id] | ||
partition_type = spec.partition_type(self.table_metadata.schema()) | ||
partition_schema = Schema(*partition_type.fields) | ||
partition_expr = self.partition_filters[spec_id] | ||
|
||
# The lambda created here is run in multiple threads. | ||
# So we avoid creating _EvaluatorExpression methods bound to a single | ||
# shared instance across multiple threads. | ||
return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition) | ||
|
||
def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]: | ||
schema = self.table_metadata.schema() | ||
include_empty_files = strtobool(self.options.get("include_empty_files", "false")) | ||
|
||
# The lambda created here is run in multiple threads. | ||
# So we avoid creating _InclusiveMetricsEvaluator methods bound to a single | ||
# shared instance across multiple threads. | ||
return lambda data_file: _InclusiveMetricsEvaluator( | ||
schema, | ||
self.row_filter, | ||
self.case_sensitive, | ||
include_empty_files, | ||
).eval(data_file) | ||
|
||
def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]: | ||
spec = self.table_metadata.specs()[spec_id] | ||
|
||
from pyiceberg.expressions.visitors import residual_evaluator_of | ||
|
||
# The lambda created here is run in multiple threads. | ||
# So we avoid creating _EvaluatorExpression methods bound to a single | ||
# shared instance across multiple threads. | ||
return lambda datafile: ( | ||
residual_evaluator_of( | ||
spec=spec, | ||
expr=self.row_filter, | ||
case_sensitive=self.case_sensitive, | ||
schema=self.table_metadata.schema(), | ||
) | ||
def _manifest_planner(self) -> ManifestGroupPlanner: | ||
return ManifestGroupPlanner( | ||
table_metadata=self.table_metadata, | ||
io=self.io, | ||
row_filter=self.row_filter, | ||
case_sensitive=self.case_sensitive, | ||
options=self.options, | ||
) | ||
|
||
@staticmethod | ||
def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool: | ||
"""Ensure that no manifests are loaded that contain deletes that are older than the data. | ||
|
||
Args: | ||
min_sequence_number (int): The minimal sequence number. | ||
manifest (ManifestFile): A ManifestFile that can be either data or deletes. | ||
|
||
Returns: | ||
Boolean indicating if it is either a data file, or a relevant delete file. | ||
""" | ||
return manifest.content == ManifestContent.DATA or ( | ||
# Not interested in deletes that are older than the data | ||
manifest.content == ManifestContent.DELETES | ||
and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number | ||
) | ||
@property | ||
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: | ||
return self._manifest_planner.partition_filters | ||
Comment on lines
+1832
to
+1834
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keeping this public method around to not introduce a breaking change to I'm not familiar with PyIceberg breaks / deprecations - would it be fine to remove these private methods or is a deprecation cycle still required? |
||
|
||
def plan_files(self) -> Iterable[FileScanTask]: | ||
"""Plans the relevant files by filtering on the PartitionSpecs. | ||
|
@@ -1900,68 +1843,7 @@ def plan_files(self) -> Iterable[FileScanTask]: | |
if not snapshot: | ||
return iter([]) | ||
|
||
# step 1: filter manifests using partition summaries | ||
# the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id | ||
|
||
manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) | ||
|
||
residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator) | ||
|
||
manifests = [ | ||
manifest_file | ||
for manifest_file in snapshot.manifests(self.io) | ||
if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) | ||
] | ||
|
||
# step 2: filter the data files in each manifest | ||
# this filter depends on the partition spec used to write the manifest file | ||
|
||
partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator) | ||
|
||
min_sequence_number = _min_sequence_number(manifests) | ||
|
||
data_entries: List[ManifestEntry] = [] | ||
positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER) | ||
|
||
executor = ExecutorFactory.get_or_create() | ||
for manifest_entry in chain( | ||
*executor.map( | ||
lambda args: _open_manifest(*args), | ||
[ | ||
( | ||
self.io, | ||
manifest, | ||
partition_evaluators[manifest.partition_spec_id], | ||
self._build_metrics_evaluator(), | ||
) | ||
for manifest in manifests | ||
if self._check_sequence_number(min_sequence_number, manifest) | ||
], | ||
) | ||
): | ||
data_file = manifest_entry.data_file | ||
if data_file.content == DataFileContent.DATA: | ||
data_entries.append(manifest_entry) | ||
elif data_file.content == DataFileContent.POSITION_DELETES: | ||
positional_delete_entries.add(manifest_entry) | ||
elif data_file.content == DataFileContent.EQUALITY_DELETES: | ||
raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568") | ||
else: | ||
raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}") | ||
|
||
return [ | ||
FileScanTask( | ||
data_entry.data_file, | ||
delete_files=_match_deletes_to_data_file( | ||
data_entry, | ||
positional_delete_entries, | ||
), | ||
residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for( | ||
data_entry.data_file.partition | ||
), | ||
) | ||
for data_entry in data_entries | ||
] | ||
return self._manifest_planner.plan_files(manifests=snapshot.manifests(self.io)) | ||
|
||
def to_arrow(self) -> pa.Table: | ||
"""Read an Arrow table eagerly from this DataScan. | ||
|
@@ -2075,6 +1957,160 @@ def count(self) -> int: | |
return res | ||
|
||
|
||
class ManifestGroupPlanner: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The motivation for a manifest-based file scan task planner comes from the Java-side https://github.com/apache/iceberg/blob/1911c94ea605a3d3f10a1994b046f00a5e9fdceb/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java#L76-L97 (class here). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's also to share this logic between scans like |
||
io: FileIO | ||
table_metadata: TableMetadata | ||
row_filter: BooleanExpression | ||
case_sensitive: bool | ||
options: Properties | ||
|
||
def __init__( | ||
self, | ||
table_metadata: TableMetadata, | ||
io: FileIO, | ||
row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, | ||
case_sensitive: bool = True, | ||
options: Properties = EMPTY_DICT, | ||
): | ||
self.table_metadata = table_metadata | ||
self.io = io | ||
self.row_filter = _parse_row_filter(row_filter) | ||
self.case_sensitive = case_sensitive | ||
self.options = options | ||
|
||
def plan_files(self, manifests: List[ManifestFile]) -> Iterable[FileScanTask]: | ||
# step 1: filter manifests using partition summaries | ||
# the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id | ||
|
||
manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) | ||
manifests = [ | ||
manifest_file for manifest_file in manifests if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) | ||
] | ||
|
||
residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator) | ||
|
||
# step 2: filter the data files in each manifest | ||
# this filter depends on the partition spec used to write the manifest file | ||
|
||
partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator) | ||
|
||
min_sequence_number = _min_sequence_number(manifests) | ||
|
||
data_entries: List[ManifestEntry] = [] | ||
positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER) | ||
|
||
executor = ExecutorFactory.get_or_create() | ||
for manifest_entry in chain( | ||
*executor.map( | ||
lambda args: _open_manifest(*args), | ||
[ | ||
( | ||
self.io, | ||
manifest, | ||
partition_evaluators[manifest.partition_spec_id], | ||
self._build_metrics_evaluator(), | ||
) | ||
for manifest in manifests | ||
if self._check_sequence_number(min_sequence_number, manifest) | ||
], | ||
) | ||
): | ||
data_file = manifest_entry.data_file | ||
if data_file.content == DataFileContent.DATA: | ||
data_entries.append(manifest_entry) | ||
elif data_file.content == DataFileContent.POSITION_DELETES: | ||
positional_delete_entries.add(manifest_entry) | ||
elif data_file.content == DataFileContent.EQUALITY_DELETES: | ||
raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568") | ||
else: | ||
raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}") | ||
|
||
return [ | ||
FileScanTask( | ||
data_entry.data_file, | ||
delete_files=_match_deletes_to_data_file( | ||
data_entry, | ||
positional_delete_entries, | ||
), | ||
residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for( | ||
data_entry.data_file.partition | ||
), | ||
) | ||
for data_entry in data_entries | ||
] | ||
|
||
@cached_property | ||
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: | ||
return KeyDefaultDict(self._build_partition_projection) | ||
|
||
def _build_partition_projection(self, spec_id: int) -> BooleanExpression: | ||
project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive) | ||
return project(self.row_filter) | ||
|
||
def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: | ||
spec = self.table_metadata.specs()[spec_id] | ||
return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive) | ||
|
||
def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: | ||
spec = self.table_metadata.specs()[spec_id] | ||
partition_type = spec.partition_type(self.table_metadata.schema()) | ||
partition_schema = Schema(*partition_type.fields) | ||
partition_expr = self.partition_filters[spec_id] | ||
|
||
# The lambda created here is run in multiple threads. | ||
# So we avoid creating _EvaluatorExpression methods bound to a single | ||
# shared instance across multiple threads. | ||
return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition) | ||
|
||
def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]: | ||
schema = self.table_metadata.schema() | ||
include_empty_files = strtobool(self.options.get("include_empty_files", "false")) | ||
|
||
# The lambda created here is run in multiple threads. | ||
# So we avoid creating _InclusiveMetricsEvaluator methods bound to a single | ||
# shared instance across multiple threads. | ||
return lambda data_file: _InclusiveMetricsEvaluator( | ||
schema, | ||
self.row_filter, | ||
self.case_sensitive, | ||
include_empty_files, | ||
).eval(data_file) | ||
|
||
def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]: | ||
spec = self.table_metadata.specs()[spec_id] | ||
|
||
from pyiceberg.expressions.visitors import residual_evaluator_of | ||
|
||
# The lambda created here is run in multiple threads. | ||
# So we avoid creating _EvaluatorExpression methods bound to a single | ||
# shared instance across multiple threads. | ||
return lambda datafile: ( | ||
residual_evaluator_of( | ||
spec=spec, | ||
expr=self.row_filter, | ||
case_sensitive=self.case_sensitive, | ||
schema=self.table_metadata.schema(), | ||
) | ||
) | ||
|
||
@staticmethod | ||
def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool: | ||
"""Ensure that no manifests are loaded that contain deletes that are older than the data. | ||
|
||
Args: | ||
min_sequence_number (int): The minimal sequence number. | ||
manifest (ManifestFile): A ManifestFile that can be either data or deletes. | ||
|
||
Returns: | ||
Boolean indicating if it is either a data file, or a relevant delete file. | ||
""" | ||
return manifest.content == ManifestContent.DATA or ( | ||
# Not interested in deletes that are older than the data | ||
manifest.content == ManifestContent.DELETES | ||
and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number | ||
) | ||
|
||
|
||
@dataclass(frozen=True) | ||
class WriteTask: | ||
"""Task with the parameters for writing a DataFile.""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could also be a field on the class set in the constructor. Kept the diff smaller here, but happy to change