-
Notifications
You must be signed in to change notification settings - Fork 342
[Append Scan] Introduce IncrementalAppendScan
class (without integration tests)
#2234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[Append Scan] Introduce IncrementalAppendScan
class (without integration tests)
#2234
Conversation
|
||
append_snapshot_ids: Set[int] = {snapshot.snapshot_id for snapshot in append_snapshots} | ||
|
||
manifests = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -1139,6 +1143,60 @@ def scan( | |||
limit=limit, | |||
) | |||
|
|||
def incremental_append_scan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. This can be set | ||
on the IncrementalAppendScan object returned, but ultimately must not be None. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) | ||
|
||
def plan_files(self) -> Iterable[FileScanTask]: | ||
from_snapshot_id, to_snapshot_id = self._validate_and_resolve_snapshots() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
).plan_files( | ||
manifests=list(manifests), | ||
manifest_entry_filter=lambda manifest_entry: manifest_entry.snapshot_id in append_snapshot_ids | ||
and manifest_entry.status == ManifestEntryStatus.ADDED, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: Contains changes from
AbstractTableScan
with default methods #2230__eq__
and__hash__
methods toManifestFile
#2233Smaller diff from those changes: smaheshwar-pltr#5.
Rationale for this change
Split up from incremental append scan work - see #2031 (comment). PyIceberg doesn't support incremental reading of appended data between snapshots, like Spark does.
This PR adds equality adds the
IncrementalAppendScan
class and the API for constructing it onpyiceberg.Table
.Are these changes tested?
Integration tests are separated into a different PR - #2235, to keep this one small.
Are there any user-facing changes?
Ignoring the other PRs, there's a new scan class and method on
Table
.