15
15
// specific language governing permissions and limitations
16
16
// under the License.
17
17
18
+ use std:: collections:: HashSet ;
18
19
use std:: sync:: Arc ;
19
20
20
21
use futures:: channel:: mpsc:: Sender ;
21
22
use futures:: { SinkExt , TryFutureExt } ;
23
+ use itertools:: Itertools ;
22
24
23
25
use crate :: delete_file_index:: DeleteFileIndex ;
24
26
use crate :: expr:: { Bind , BoundPredicate , Predicate } ;
@@ -28,11 +30,12 @@ use crate::scan::{
28
30
PartitionFilterCache ,
29
31
} ;
30
32
use crate :: spec:: {
31
- ManifestContentType , ManifestEntryRef , ManifestFile , ManifestList , SchemaRef , SnapshotRef ,
32
- TableMetadataRef ,
33
+ DataContentType , ManifestContentType , ManifestEntryRef , ManifestFile , ManifestList ,
34
+ ManifestStatus , Operation , SchemaRef , SnapshotRef , TableMetadataRef ,
33
35
} ;
34
36
use crate :: { Error , ErrorKind , Result } ;
35
37
38
+ type ManifestEntryFilterFn = dyn Fn ( & ManifestEntryRef ) -> bool + Send + Sync ;
36
39
/// Wraps a [`ManifestFile`] alongside the objects that are needed
37
40
/// to process it in a thread-safe manner
38
41
pub ( crate ) struct ManifestFileContext {
@@ -45,7 +48,11 @@ pub(crate) struct ManifestFileContext {
45
48
object_cache : Arc < ObjectCache > ,
46
49
snapshot_schema : SchemaRef ,
47
50
expression_evaluator_cache : Arc < ExpressionEvaluatorCache > ,
48
- delete_file_index : DeleteFileIndex ,
51
+ delete_file_index : Option < DeleteFileIndex > ,
52
+
53
+ /// filter manifest entries.
54
+ /// Used for different kind of scans, e.g., only scan newly added files without delete files.
55
+ filter_fn : Option < Arc < ManifestEntryFilterFn > > ,
49
56
}
50
57
51
58
/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
@@ -58,7 +65,7 @@ pub(crate) struct ManifestEntryContext {
58
65
pub bound_predicates : Option < Arc < BoundPredicates > > ,
59
66
pub partition_spec_id : i32 ,
60
67
pub snapshot_schema : SchemaRef ,
61
- pub delete_file_index : DeleteFileIndex ,
68
+ pub delete_file_index : Option < DeleteFileIndex > ,
62
69
}
63
70
64
71
impl ManifestFileContext {
@@ -74,12 +81,13 @@ impl ManifestFileContext {
74
81
mut sender,
75
82
expression_evaluator_cache,
76
83
delete_file_index,
77
- ..
84
+ filter_fn ,
78
85
} = self ;
86
+ let filter_fn = filter_fn. unwrap_or_else ( || Arc :: new ( |_| true ) ) ;
79
87
80
88
let manifest = object_cache. get_manifest ( & manifest_file) . await ?;
81
89
82
- for manifest_entry in manifest. entries ( ) {
90
+ for manifest_entry in manifest. entries ( ) . iter ( ) . filter ( |e| filter_fn ( e ) ) {
83
91
let manifest_entry_context = ManifestEntryContext {
84
92
// TODO: refactor to avoid the expensive ManifestEntry clone
85
93
manifest_entry : manifest_entry. clone ( ) ,
@@ -105,13 +113,16 @@ impl ManifestEntryContext {
105
113
/// consume this `ManifestEntryContext`, returning a `FileScanTask`
106
114
/// created from it
107
115
pub ( crate ) async fn into_file_scan_task ( self ) -> Result < FileScanTask > {
108
- let deletes = self
109
- . delete_file_index
110
- . get_deletes_for_data_file (
111
- self . manifest_entry . data_file ( ) ,
112
- self . manifest_entry . sequence_number ( ) ,
113
- )
114
- . await ;
116
+ let deletes = if let Some ( delete_file_index) = self . delete_file_index {
117
+ delete_file_index
118
+ . get_deletes_for_data_file (
119
+ self . manifest_entry . data_file ( ) ,
120
+ self . manifest_entry . sequence_number ( ) ,
121
+ )
122
+ . await
123
+ } else {
124
+ vec ! [ ]
125
+ } ;
115
126
116
127
Ok ( FileScanTask {
117
128
start : 0 ,
@@ -150,6 +161,11 @@ pub(crate) struct PlanContext {
150
161
pub partition_filter_cache : Arc < PartitionFilterCache > ,
151
162
pub manifest_evaluator_cache : Arc < ManifestEvaluatorCache > ,
152
163
pub expression_evaluator_cache : Arc < ExpressionEvaluatorCache > ,
164
+
165
+ // for incremental scan.
166
+ // If `to_snapshot_id` is set, it means incremental scan. `from_snapshot_id` can be `None`.
167
+ pub from_snapshot_id : Option < i64 > ,
168
+ pub to_snapshot_id : Option < i64 > ,
153
169
}
154
170
155
171
impl PlanContext {
@@ -181,23 +197,84 @@ impl PlanContext {
181
197
Ok ( partition_filter)
182
198
}
183
199
184
- pub ( crate ) fn build_manifest_file_contexts (
200
+ pub ( crate ) async fn build_manifest_file_contexts (
185
201
& self ,
186
- manifest_list : Arc < ManifestList > ,
187
202
tx_data : Sender < ManifestEntryContext > ,
188
- delete_file_idx : DeleteFileIndex ,
189
- delete_file_tx : Sender < ManifestEntryContext > ,
203
+ delete_file_idx_and_tx : Option < ( DeleteFileIndex , Sender < ManifestEntryContext > ) > ,
190
204
) -> Result < Box < impl Iterator < Item = Result < ManifestFileContext > > + ' static > > {
191
- let manifest_files = manifest_list. entries ( ) . iter ( ) ;
205
+ let mut filter_fn: Option < Arc < ManifestEntryFilterFn > > = None ;
206
+ let manifest_files = {
207
+ if let Some ( to_snapshot_id) = self . to_snapshot_id {
208
+ // Incremental scan mode:
209
+ // Get all added files between two snapshots.
210
+ // - data files in `Append` and `Overwrite` snapshots are included.
211
+ // - delete files are ignored
212
+ // - `Replace` snapshots (e.g., compaction) are ignored.
213
+ //
214
+ // `latest_snapshot_id` is inclusive, `oldest_snapshot_id` is exclusive.
215
+
216
+ // prevent misuse
217
+ assert ! (
218
+ delete_file_idx_and_tx. is_none( ) ,
219
+ "delete file is not supported in incremental scan mode"
220
+ ) ;
221
+
222
+ let snapshots =
223
+ ancestors_between ( & self . table_metadata , to_snapshot_id, self . from_snapshot_id )
224
+ . filter ( |snapshot| {
225
+ matches ! (
226
+ snapshot. summary( ) . operation,
227
+ Operation :: Append | Operation :: Overwrite
228
+ )
229
+ } )
230
+ . collect_vec ( ) ;
231
+ let snapshot_ids: HashSet < i64 > = snapshots
232
+ . iter ( )
233
+ . map ( |snapshot| snapshot. snapshot_id ( ) )
234
+ . collect ( ) ;
235
+
236
+ let mut manifest_files = vec ! [ ] ;
237
+ for snapshot in snapshots {
238
+ let manifest_list = self
239
+ . object_cache
240
+ . get_manifest_list ( & snapshot, & self . table_metadata )
241
+ . await ?;
242
+ for entry in manifest_list. entries ( ) {
243
+ if !snapshot_ids. contains ( & entry. added_snapshot_id ) {
244
+ continue ;
245
+ }
246
+ manifest_files. push ( entry. clone ( ) ) ;
247
+ }
248
+ }
249
+
250
+ filter_fn = Some ( Arc :: new ( move |entry : & ManifestEntryRef | {
251
+ matches ! ( entry. status( ) , ManifestStatus :: Added )
252
+ && matches ! ( entry. data_file( ) . content_type( ) , DataContentType :: Data )
253
+ && (
254
+ // Is it possible that the snapshot id here is not contained?
255
+ entry. snapshot_id ( ) . is_none ( )
256
+ || snapshot_ids. contains ( & entry. snapshot_id ( ) . unwrap ( ) )
257
+ )
258
+ } ) ) ;
259
+
260
+ manifest_files
261
+ } else {
262
+ let manifest_list = self . get_manifest_list ( ) . await ?;
263
+ manifest_list. entries ( ) . to_vec ( )
264
+ }
265
+ } ;
192
266
193
267
// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
194
268
let mut filtered_mfcs = vec ! [ ] ;
195
269
196
- for manifest_file in manifest_files {
197
- let tx = if manifest_file. content == ManifestContentType :: Deletes {
198
- delete_file_tx. clone ( )
270
+ for manifest_file in & manifest_files {
271
+ let ( delete_file_idx, tx) = if manifest_file. content == ManifestContentType :: Deletes {
272
+ let Some ( ( delete_file_idx, tx) ) = delete_file_idx_and_tx. as_ref ( ) else {
273
+ continue ;
274
+ } ;
275
+ ( Some ( delete_file_idx. clone ( ) ) , tx. clone ( ) )
199
276
} else {
200
- tx_data. clone ( )
277
+ ( delete_file_idx_and_tx . as_ref ( ) . map ( | ( idx , _ ) | idx . clone ( ) ) , tx_data. clone ( ) )
201
278
} ;
202
279
203
280
let partition_bound_predicate = if self . predicate . is_some ( ) {
@@ -225,7 +302,8 @@ impl PlanContext {
225
302
manifest_file,
226
303
partition_bound_predicate,
227
304
tx,
228
- delete_file_idx. clone ( ) ,
305
+ delete_file_idx,
306
+ filter_fn. clone ( ) ,
229
307
) ;
230
308
231
309
filtered_mfcs. push ( Ok ( mfc) ) ;
@@ -239,7 +317,8 @@ impl PlanContext {
239
317
manifest_file : & ManifestFile ,
240
318
partition_filter : Option < Arc < BoundPredicate > > ,
241
319
sender : Sender < ManifestEntryContext > ,
242
- delete_file_index : DeleteFileIndex ,
320
+ delete_file_index : Option < DeleteFileIndex > ,
321
+ filter_fn : Option < Arc < ManifestEntryFilterFn > > ,
243
322
) -> ManifestFileContext {
244
323
let bound_predicates =
245
324
if let ( Some ( ref partition_bound_predicate) , Some ( snapshot_bound_predicate) ) =
@@ -262,6 +341,61 @@ impl PlanContext {
262
341
field_ids : self . field_ids . clone ( ) ,
263
342
expression_evaluator_cache : self . expression_evaluator_cache . clone ( ) ,
264
343
delete_file_index,
344
+ filter_fn,
265
345
}
266
346
}
267
347
}
348
+
349
+ struct Ancestors {
350
+ next : Option < SnapshotRef > ,
351
+ get_snapshot : Box < dyn Fn ( i64 ) -> Option < SnapshotRef > + Send > ,
352
+ }
353
+
354
+ impl Iterator for Ancestors {
355
+ type Item = SnapshotRef ;
356
+
357
+ fn next ( & mut self ) -> Option < Self :: Item > {
358
+ let snapshot = self . next . take ( ) ?;
359
+ let result = snapshot. clone ( ) ;
360
+ self . next = snapshot
361
+ . parent_snapshot_id ( )
362
+ . and_then ( |id| ( self . get_snapshot ) ( id) ) ;
363
+ Some ( result)
364
+ }
365
+ }
366
+
367
+ /// Iterate starting from `snapshot` (inclusive) to the root snapshot.
368
+ fn ancestors_of (
369
+ table_metadata : & TableMetadataRef ,
370
+ snapshot : i64 ,
371
+ ) -> Box < dyn Iterator < Item = SnapshotRef > + Send > {
372
+ if let Some ( snapshot) = table_metadata. snapshot_by_id ( snapshot) {
373
+ let table_metadata = table_metadata. clone ( ) ;
374
+ Box :: new ( Ancestors {
375
+ next : Some ( snapshot. clone ( ) ) ,
376
+ get_snapshot : Box :: new ( move |id| table_metadata. snapshot_by_id ( id) . cloned ( ) ) ,
377
+ } )
378
+ } else {
379
+ Box :: new ( std:: iter:: empty ( ) )
380
+ }
381
+ }
382
+
383
+ /// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive).
384
+ fn ancestors_between (
385
+ table_metadata : & TableMetadataRef ,
386
+ latest_snapshot_id : i64 ,
387
+ oldest_snapshot_id : Option < i64 > ,
388
+ ) -> Box < dyn Iterator < Item = SnapshotRef > + Send > {
389
+ let Some ( oldest_snapshot_id) = oldest_snapshot_id else {
390
+ return Box :: new ( ancestors_of ( table_metadata, latest_snapshot_id) ) ;
391
+ } ;
392
+
393
+ if latest_snapshot_id == oldest_snapshot_id {
394
+ return Box :: new ( std:: iter:: empty ( ) ) ;
395
+ }
396
+
397
+ Box :: new (
398
+ ancestors_of ( table_metadata, latest_snapshot_id)
399
+ . take_while ( move |snapshot| snapshot. snapshot_id ( ) != oldest_snapshot_id) ,
400
+ )
401
+ }
0 commit comments