Skip to content

Commit 3ac5b70

Browse files
committed
feat: support incremental scan between 2 snapshots
Signed-off-by: xxchan <[email protected]>
1 parent 1d217d4 commit 3ac5b70

File tree

2 files changed

+233
-57
lines changed

2 files changed

+233
-57
lines changed

crates/iceberg/src/scan/context.rs

Lines changed: 158 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::collections::HashSet;
1819
use std::sync::Arc;
1920

2021
use futures::channel::mpsc::Sender;
2122
use futures::{SinkExt, TryFutureExt};
23+
use itertools::Itertools;
2224

2325
use crate::delete_file_index::DeleteFileIndex;
2426
use crate::expr::{Bind, BoundPredicate, Predicate};
@@ -28,11 +30,12 @@ use crate::scan::{
2830
PartitionFilterCache,
2931
};
3032
use crate::spec::{
31-
ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, SchemaRef, SnapshotRef,
32-
TableMetadataRef,
33+
DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList,
34+
ManifestStatus, Operation, SchemaRef, SnapshotRef, TableMetadataRef,
3335
};
3436
use crate::{Error, ErrorKind, Result};
3537

38+
type ManifestEntryFilterFn = dyn Fn(&ManifestEntryRef) -> bool + Send + Sync;
3639
/// Wraps a [`ManifestFile`] alongside the objects that are needed
3740
/// to process it in a thread-safe manner
3841
pub(crate) struct ManifestFileContext {
@@ -45,7 +48,11 @@ pub(crate) struct ManifestFileContext {
4548
object_cache: Arc<ObjectCache>,
4649
snapshot_schema: SchemaRef,
4750
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
48-
delete_file_index: DeleteFileIndex,
51+
delete_file_index: Option<DeleteFileIndex>,
52+
53+
/// filter manifest entries.
54+
/// Used for different kind of scans, e.g., only scan newly added files without delete files.
55+
filter_fn: Option<Arc<ManifestEntryFilterFn>>,
4956
}
5057

5158
/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
@@ -58,7 +65,7 @@ pub(crate) struct ManifestEntryContext {
5865
pub bound_predicates: Option<Arc<BoundPredicates>>,
5966
pub partition_spec_id: i32,
6067
pub snapshot_schema: SchemaRef,
61-
pub delete_file_index: DeleteFileIndex,
68+
pub delete_file_index: Option<DeleteFileIndex>,
6269
}
6370

6471
impl ManifestFileContext {
@@ -74,12 +81,13 @@ impl ManifestFileContext {
7481
mut sender,
7582
expression_evaluator_cache,
7683
delete_file_index,
77-
..
84+
filter_fn,
7885
} = self;
86+
let filter_fn = filter_fn.unwrap_or_else(|| Arc::new(|_| true));
7987

8088
let manifest = object_cache.get_manifest(&manifest_file).await?;
8189

82-
for manifest_entry in manifest.entries() {
90+
for manifest_entry in manifest.entries().iter().filter(|e| filter_fn(e)) {
8391
let manifest_entry_context = ManifestEntryContext {
8492
// TODO: refactor to avoid the expensive ManifestEntry clone
8593
manifest_entry: manifest_entry.clone(),
@@ -105,13 +113,16 @@ impl ManifestEntryContext {
105113
/// consume this `ManifestEntryContext`, returning a `FileScanTask`
106114
/// created from it
107115
pub(crate) async fn into_file_scan_task(self) -> Result<FileScanTask> {
108-
let deletes = self
109-
.delete_file_index
110-
.get_deletes_for_data_file(
111-
self.manifest_entry.data_file(),
112-
self.manifest_entry.sequence_number(),
113-
)
114-
.await;
116+
let deletes = if let Some(delete_file_index) = self.delete_file_index {
117+
delete_file_index
118+
.get_deletes_for_data_file(
119+
self.manifest_entry.data_file(),
120+
self.manifest_entry.sequence_number(),
121+
)
122+
.await
123+
} else {
124+
vec![]
125+
};
115126

116127
Ok(FileScanTask {
117128
start: 0,
@@ -149,6 +160,11 @@ pub(crate) struct PlanContext {
149160
pub partition_filter_cache: Arc<PartitionFilterCache>,
150161
pub manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
151162
pub expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
163+
164+
// for incremental scan.
165+
// If `to_snapshot_id` is set, it means incremental scan. `from_snapshot_id` can be `None`.
166+
pub from_snapshot_id: Option<i64>,
167+
pub to_snapshot_id: Option<i64>,
152168
}
153169

154170
impl PlanContext {
@@ -180,23 +196,84 @@ impl PlanContext {
180196
Ok(partition_filter)
181197
}
182198

183-
pub(crate) fn build_manifest_file_contexts(
199+
pub(crate) async fn build_manifest_file_contexts(
184200
&self,
185-
manifest_list: Arc<ManifestList>,
186201
tx_data: Sender<ManifestEntryContext>,
187-
delete_file_idx: DeleteFileIndex,
188-
delete_file_tx: Sender<ManifestEntryContext>,
202+
delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<ManifestEntryContext>)>,
189203
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> + 'static>> {
190-
let manifest_files = manifest_list.entries().iter();
204+
let mut filter_fn: Option<Arc<ManifestEntryFilterFn>> = None;
205+
let manifest_files = {
206+
if let Some(to_snapshot_id) = self.to_snapshot_id {
207+
// Incremental scan mode:
208+
// Get all added files between two snapshots.
209+
// - data files in `Append` and `Overwrite` snapshots are included.
210+
// - delete files are ignored
211+
// - `Replace` snapshots (e.g., compaction) are ignored.
212+
//
213+
// `latest_snapshot_id` is inclusive, `oldest_snapshot_id` is exclusive.
214+
215+
// prevent misuse
216+
assert!(
217+
delete_file_idx_and_tx.is_none(),
218+
"delete file is not supported in incremental scan mode"
219+
);
220+
221+
let snapshots =
222+
ancestors_between(&self.table_metadata, to_snapshot_id, self.from_snapshot_id)
223+
.filter(|snapshot| {
224+
matches!(
225+
snapshot.summary().operation,
226+
Operation::Append | Operation::Overwrite
227+
)
228+
})
229+
.collect_vec();
230+
let snapshot_ids: HashSet<i64> = snapshots
231+
.iter()
232+
.map(|snapshot| snapshot.snapshot_id())
233+
.collect();
234+
235+
let mut manifest_files = vec![];
236+
for snapshot in snapshots {
237+
let manifest_list = self
238+
.object_cache
239+
.get_manifest_list(&snapshot, &self.table_metadata)
240+
.await?;
241+
for entry in manifest_list.entries() {
242+
if !snapshot_ids.contains(&entry.added_snapshot_id) {
243+
continue;
244+
}
245+
manifest_files.push(entry.clone());
246+
}
247+
}
248+
249+
filter_fn = Some(Arc::new(move |entry: &ManifestEntryRef| {
250+
matches!(entry.status(), ManifestStatus::Added)
251+
&& matches!(entry.data_file().content_type(), DataContentType::Data)
252+
&& (
253+
// Is it possible that the snapshot id here is not contained?
254+
entry.snapshot_id().is_none()
255+
|| snapshot_ids.contains(&entry.snapshot_id().unwrap())
256+
)
257+
}));
258+
259+
manifest_files
260+
} else {
261+
let manifest_list = self.get_manifest_list().await?;
262+
manifest_list.entries().to_vec()
263+
}
264+
};
191265

192266
// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
193267
let mut filtered_mfcs = vec![];
194268

195-
for manifest_file in manifest_files {
196-
let tx = if manifest_file.content == ManifestContentType::Deletes {
197-
delete_file_tx.clone()
269+
for manifest_file in &manifest_files {
270+
let (delete_file_idx, tx) = if manifest_file.content == ManifestContentType::Deletes {
271+
let Some((delete_file_idx, tx)) = delete_file_idx_and_tx.as_ref() else {
272+
continue;
273+
};
274+
(Some(delete_file_idx.clone()), tx.clone())
198275
} else {
199-
tx_data.clone()
276+
(delete_file_idx_and_tx.as_ref().map(|(idx, _)| idx.clone()), tx_data.clone())
200277
};
201278

202279
let partition_bound_predicate = if self.predicate.is_some() {
@@ -224,7 +301,8 @@ impl PlanContext {
224301
manifest_file,
225302
partition_bound_predicate,
226303
tx,
227-
delete_file_idx.clone(),
304+
delete_file_idx,
305+
filter_fn.clone(),
228306
);
229307

230308
filtered_mfcs.push(Ok(mfc));
@@ -238,7 +316,8 @@ impl PlanContext {
238316
manifest_file: &ManifestFile,
239317
partition_filter: Option<Arc<BoundPredicate>>,
240318
sender: Sender<ManifestEntryContext>,
241-
delete_file_index: DeleteFileIndex,
319+
delete_file_index: Option<DeleteFileIndex>,
320+
filter_fn: Option<Arc<ManifestEntryFilterFn>>,
242321
) -> ManifestFileContext {
243322
let bound_predicates =
244323
if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) =
@@ -261,6 +340,61 @@ impl PlanContext {
261340
field_ids: self.field_ids.clone(),
262341
expression_evaluator_cache: self.expression_evaluator_cache.clone(),
263342
delete_file_index,
343+
filter_fn,
264344
}
265345
}
266346
}
347+
348+
struct Ancestors {
349+
next: Option<SnapshotRef>,
350+
get_snapshot: Box<dyn Fn(i64) -> Option<SnapshotRef> + Send>,
351+
}
352+
353+
impl Iterator for Ancestors {
354+
type Item = SnapshotRef;
355+
356+
fn next(&mut self) -> Option<Self::Item> {
357+
let snapshot = self.next.take()?;
358+
let result = snapshot.clone();
359+
self.next = snapshot
360+
.parent_snapshot_id()
361+
.and_then(|id| (self.get_snapshot)(id));
362+
Some(result)
363+
}
364+
}
365+
366+
/// Iterate starting from `snapshot` (inclusive) to the root snapshot.
367+
fn ancestors_of(
368+
table_metadata: &TableMetadataRef,
369+
snapshot: i64,
370+
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
371+
if let Some(snapshot) = table_metadata.snapshot_by_id(snapshot) {
372+
let table_metadata = table_metadata.clone();
373+
Box::new(Ancestors {
374+
next: Some(snapshot.clone()),
375+
get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()),
376+
})
377+
} else {
378+
Box::new(std::iter::empty())
379+
}
380+
}
381+
382+
/// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive).
383+
fn ancestors_between(
384+
table_metadata: &TableMetadataRef,
385+
latest_snapshot_id: i64,
386+
oldest_snapshot_id: Option<i64>,
387+
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
388+
let Some(oldest_snapshot_id) = oldest_snapshot_id else {
389+
return Box::new(ancestors_of(table_metadata, latest_snapshot_id));
390+
};
391+
392+
if latest_snapshot_id == oldest_snapshot_id {
393+
return Box::new(std::iter::empty());
394+
}
395+
396+
Box::new(
397+
ancestors_of(table_metadata, latest_snapshot_id)
398+
.take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id),
399+
)
400+
}

0 commit comments

Comments
 (0)