Skip to content

[Append Scan] Extract manifest group planning into separate class #2232

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 165 additions & 129 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1819,76 +1819,19 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent


class DataScan(TableScan):
def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive)
return project(self.row_filter)

@cached_property
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
return KeyDefaultDict(self._build_partition_projection)

def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
spec = self.table_metadata.specs()[spec_id]
return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive)

def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
spec = self.table_metadata.specs()[spec_id]
partition_type = spec.partition_type(self.table_metadata.schema())
partition_schema = Schema(*partition_type.fields)
partition_expr = self.partition_filters[spec_id]

# The lambda created here is run in multiple threads.
# So we avoid creating _EvaluatorExpression methods bound to a single
# shared instance across multiple threads.
return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)

def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]:
schema = self.table_metadata.schema()
include_empty_files = strtobool(self.options.get("include_empty_files", "false"))

# The lambda created here is run in multiple threads.
# So we avoid creating _InclusiveMetricsEvaluator methods bound to a single
# shared instance across multiple threads.
return lambda data_file: _InclusiveMetricsEvaluator(
schema,
self.row_filter,
self.case_sensitive,
include_empty_files,
).eval(data_file)

def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]:
spec = self.table_metadata.specs()[spec_id]

from pyiceberg.expressions.visitors import residual_evaluator_of

# The lambda created here is run in multiple threads.
# So we avoid creating _EvaluatorExpression methods bound to a single
# shared instance across multiple threads.
return lambda datafile: (
residual_evaluator_of(
spec=spec,
expr=self.row_filter,
case_sensitive=self.case_sensitive,
schema=self.table_metadata.schema(),
)
def _manifest_planner(self) -> ManifestGroupPlanner:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also be a field on the class set in the constructor. Kept the diff smaller here, but happy to change

return ManifestGroupPlanner(
table_metadata=self.table_metadata,
io=self.io,
row_filter=self.row_filter,
case_sensitive=self.case_sensitive,
options=self.options,
)

@staticmethod
def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool:
"""Ensure that no manifests are loaded that contain deletes that are older than the data.

Args:
min_sequence_number (int): The minimal sequence number.
manifest (ManifestFile): A ManifestFile that can be either data or deletes.

Returns:
Boolean indicating if it is either a data file, or a relevant delete file.
"""
return manifest.content == ManifestContent.DATA or (
# Not interested in deletes that are older than the data
manifest.content == ManifestContent.DELETES
and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number
)
@property
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
return self._manifest_planner.partition_filters
Comment on lines +1832 to +1834
Copy link
Contributor Author

@smaheshwar-pltr smaheshwar-pltr Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping this public method around to not introduce a breaking change to DataScan. On the other hand, the private methods have been moved into ManifestGroupPlanner. Technically, that could still break users subclassing DataScan and calling the removed methods in the subclass (and also users accessing those private methods, but that feels more OK to break than the subclassing case).

I'm not familiar with PyIceberg breaks / deprecations - would it be fine to remove these private methods or is a deprecation cycle still required?


def plan_files(self) -> Iterable[FileScanTask]:
"""Plans the relevant files by filtering on the PartitionSpecs.
Expand All @@ -1900,68 +1843,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
if not snapshot:
return iter([])

# step 1: filter manifests using partition summaries
# the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id

manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)

residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator)

manifests = [
manifest_file
for manifest_file in snapshot.manifests(self.io)
if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
]

# step 2: filter the data files in each manifest
# this filter depends on the partition spec used to write the manifest file

partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)

min_sequence_number = _min_sequence_number(manifests)

data_entries: List[ManifestEntry] = []
positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)

executor = ExecutorFactory.get_or_create()
for manifest_entry in chain(
*executor.map(
lambda args: _open_manifest(*args),
[
(
self.io,
manifest,
partition_evaluators[manifest.partition_spec_id],
self._build_metrics_evaluator(),
)
for manifest in manifests
if self._check_sequence_number(min_sequence_number, manifest)
],
)
):
data_file = manifest_entry.data_file
if data_file.content == DataFileContent.DATA:
data_entries.append(manifest_entry)
elif data_file.content == DataFileContent.POSITION_DELETES:
positional_delete_entries.add(manifest_entry)
elif data_file.content == DataFileContent.EQUALITY_DELETES:
raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
else:
raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")

return [
FileScanTask(
data_entry.data_file,
delete_files=_match_deletes_to_data_file(
data_entry,
positional_delete_entries,
),
residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for(
data_entry.data_file.partition
),
)
for data_entry in data_entries
]
return self._manifest_planner.plan_files(manifests=snapshot.manifests(self.io))

def to_arrow(self) -> pa.Table:
"""Read an Arrow table eagerly from this DataScan.
Expand Down Expand Up @@ -2075,6 +1957,160 @@ def count(self) -> int:
return res


class ManifestGroupPlanner:
Copy link
Contributor Author

@smaheshwar-pltr smaheshwar-pltr Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also to share this logic between scans like DataScan and IncrementalAppendScan, that both use this flow.

io: FileIO
table_metadata: TableMetadata
row_filter: BooleanExpression
case_sensitive: bool
options: Properties

def __init__(
self,
table_metadata: TableMetadata,
io: FileIO,
row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
case_sensitive: bool = True,
options: Properties = EMPTY_DICT,
):
self.table_metadata = table_metadata
self.io = io
self.row_filter = _parse_row_filter(row_filter)
self.case_sensitive = case_sensitive
self.options = options

def plan_files(self, manifests: List[ManifestFile]) -> Iterable[FileScanTask]:
# step 1: filter manifests using partition summaries
# the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id

manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
manifests = [
manifest_file for manifest_file in manifests if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
]

residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator)

# step 2: filter the data files in each manifest
# this filter depends on the partition spec used to write the manifest file

partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)

min_sequence_number = _min_sequence_number(manifests)

data_entries: List[ManifestEntry] = []
positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)

executor = ExecutorFactory.get_or_create()
for manifest_entry in chain(
*executor.map(
lambda args: _open_manifest(*args),
[
(
self.io,
manifest,
partition_evaluators[manifest.partition_spec_id],
self._build_metrics_evaluator(),
)
for manifest in manifests
if self._check_sequence_number(min_sequence_number, manifest)
],
)
):
data_file = manifest_entry.data_file
if data_file.content == DataFileContent.DATA:
data_entries.append(manifest_entry)
elif data_file.content == DataFileContent.POSITION_DELETES:
positional_delete_entries.add(manifest_entry)
elif data_file.content == DataFileContent.EQUALITY_DELETES:
raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
else:
raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")

return [
FileScanTask(
data_entry.data_file,
delete_files=_match_deletes_to_data_file(
data_entry,
positional_delete_entries,
),
residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for(
data_entry.data_file.partition
),
)
for data_entry in data_entries
]

@cached_property
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
return KeyDefaultDict(self._build_partition_projection)

def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive)
return project(self.row_filter)

def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
spec = self.table_metadata.specs()[spec_id]
return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive)

def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
spec = self.table_metadata.specs()[spec_id]
partition_type = spec.partition_type(self.table_metadata.schema())
partition_schema = Schema(*partition_type.fields)
partition_expr = self.partition_filters[spec_id]

# The lambda created here is run in multiple threads.
# So we avoid creating _EvaluatorExpression methods bound to a single
# shared instance across multiple threads.
return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)

def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]:
schema = self.table_metadata.schema()
include_empty_files = strtobool(self.options.get("include_empty_files", "false"))

# The lambda created here is run in multiple threads.
# So we avoid creating _InclusiveMetricsEvaluator methods bound to a single
# shared instance across multiple threads.
return lambda data_file: _InclusiveMetricsEvaluator(
schema,
self.row_filter,
self.case_sensitive,
include_empty_files,
).eval(data_file)

def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]:
spec = self.table_metadata.specs()[spec_id]

from pyiceberg.expressions.visitors import residual_evaluator_of

# The lambda created here is run in multiple threads.
# So we avoid creating _EvaluatorExpression methods bound to a single
# shared instance across multiple threads.
return lambda datafile: (
residual_evaluator_of(
spec=spec,
expr=self.row_filter,
case_sensitive=self.case_sensitive,
schema=self.table_metadata.schema(),
)
)

@staticmethod
def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool:
"""Ensure that no manifests are loaded that contain deletes that are older than the data.

Args:
min_sequence_number (int): The minimal sequence number.
manifest (ManifestFile): A ManifestFile that can be either data or deletes.

Returns:
Boolean indicating if it is either a data file, or a relevant delete file.
"""
return manifest.content == ManifestContent.DATA or (
# Not interested in deletes that are older than the data
manifest.content == ManifestContent.DELETES
and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number
)


@dataclass(frozen=True)
class WriteTask:
"""Task with the parameters for writing a DataFile."""
Expand Down