Skip to content

Commit e1de63d

Browse files
authored
fix: support empty scans (#1166)
## Which issue does this PR close? Closes #1145. ## What changes are included in this PR? Allow scanning of a table with no snapshots. AFAICT, iceberg-python has [analogous logic](https://github.com/apache/iceberg-python/blob/main/pyiceberg/table/__init__.py#L1684-L1686) to bail out when there is no snapshot. ## Are these changes tested? Yes: `test_plan_files_on_table_without_any_snapshots`.
1 parent fcc8892 commit e1de63d

File tree

2 files changed

+139
-16
lines changed

2 files changed

+139
-16
lines changed

crates/iceberg/src/scan/mod.rs

Lines changed: 82 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -214,14 +214,23 @@ impl<'a> TableScanBuilder<'a> {
214214
)
215215
})?
216216
.clone(),
217-
None => self
218-
.table
219-
.metadata()
220-
.current_snapshot()
221-
.ok_or_else(|| {
222-
Error::new(ErrorKind::Unexpected, "Can't scan table without snapshots")
223-
})?
224-
.clone(),
217+
None => {
218+
let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else {
219+
return Ok(TableScan {
220+
batch_size: self.batch_size,
221+
column_names: self.column_names,
222+
file_io: self.table.file_io().clone(),
223+
plan_context: None,
224+
concurrency_limit_data_files: self.concurrency_limit_data_files,
225+
concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
226+
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
227+
row_group_filtering_enabled: self.row_group_filtering_enabled,
228+
row_selection_enabled: self.row_selection_enabled,
229+
delete_file_processing_enabled: self.delete_file_processing_enabled,
230+
});
231+
};
232+
current_snapshot_id.clone()
233+
}
225234
};
226235

227236
let schema = snapshot.schema(self.table.metadata())?;
@@ -302,7 +311,7 @@ impl<'a> TableScanBuilder<'a> {
302311
batch_size: self.batch_size,
303312
column_names: self.column_names,
304313
file_io: self.table.file_io().clone(),
305-
plan_context,
314+
plan_context: Some(plan_context),
306315
concurrency_limit_data_files: self.concurrency_limit_data_files,
307316
concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
308317
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
@@ -316,7 +325,10 @@ impl<'a> TableScanBuilder<'a> {
316325
/// Table scan.
317326
#[derive(Debug)]
318327
pub struct TableScan {
319-
plan_context: PlanContext,
328+
/// A [PlanContext], if this table has at least one snapshot, otherwise None.
329+
///
330+
/// If this is None, then the scan contains no rows.
331+
plan_context: Option<PlanContext>,
320332
batch_size: Option<usize>,
321333
file_io: FileIO,
322334
column_names: Option<Vec<String>>,
@@ -340,6 +352,10 @@ pub struct TableScan {
340352
impl TableScan {
341353
/// Returns a stream of [`FileScanTask`]s.
342354
pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
355+
let Some(plan_context) = self.plan_context.as_ref() else {
356+
return Ok(Box::pin(futures::stream::empty()));
357+
};
358+
343359
let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files;
344360
let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries;
345361

@@ -359,12 +375,12 @@ impl TableScan {
359375
None
360376
};
361377

362-
let manifest_list = self.plan_context.get_manifest_list().await?;
378+
let manifest_list = plan_context.get_manifest_list().await?;
363379

364380
// get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
365381
// whose partitions cannot match this
366382
// scan's filter
367-
let manifest_file_contexts = self.plan_context.build_manifest_file_contexts(
383+
let manifest_file_contexts = plan_context.build_manifest_file_contexts(
368384
manifest_list,
369385
manifest_entry_data_ctx_tx,
370386
delete_file_idx_and_tx.as_ref().map(|(delete_file_idx, _)| {
@@ -463,8 +479,8 @@ impl TableScan {
463479
}
464480

465481
/// Returns a reference to the snapshot of the table scan.
466-
pub fn snapshot(&self) -> &SnapshotRef {
467-
&self.plan_context.snapshot
482+
pub fn snapshot(&self) -> Option<&SnapshotRef> {
483+
self.plan_context.as_ref().map(|x| &x.snapshot)
468484
}
469485

470486
async fn process_data_manifest_entry(
@@ -652,6 +668,45 @@ pub mod tests {
652668
}
653669
}
654670

671+
#[allow(clippy::new_without_default)]
672+
pub fn new_empty() -> Self {
673+
let tmp_dir = TempDir::new().unwrap();
674+
let table_location = tmp_dir.path().join("table1");
675+
let table_metadata1_location = table_location.join("metadata/v1.json");
676+
677+
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
678+
.unwrap()
679+
.build()
680+
.unwrap();
681+
682+
let table_metadata = {
683+
let template_json_str = fs::read_to_string(format!(
684+
"{}/testdata/example_empty_table_metadata_v2.json",
685+
env!("CARGO_MANIFEST_DIR")
686+
))
687+
.unwrap();
688+
let mut context = Context::new();
689+
context.insert("table_location", &table_location);
690+
context.insert("table_metadata_1_location", &table_metadata1_location);
691+
692+
let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap();
693+
serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
694+
};
695+
696+
let table = Table::builder()
697+
.metadata(table_metadata)
698+
.identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
699+
.file_io(file_io.clone())
700+
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
701+
.build()
702+
.unwrap();
703+
704+
Self {
705+
table_location: table_location.to_str().unwrap().to_string(),
706+
table,
707+
}
708+
}
709+
655710
pub fn new_unpartitioned() -> Self {
656711
let tmp_dir = TempDir::new().unwrap();
657712
let table_location = tmp_dir.path().join("table1");
@@ -1178,7 +1233,7 @@ pub mod tests {
11781233
let table_scan = table.scan().build().unwrap();
11791234
assert_eq!(
11801235
table.metadata().current_snapshot().unwrap().snapshot_id(),
1181-
table_scan.snapshot().snapshot_id()
1236+
table_scan.snapshot().unwrap().snapshot_id()
11821237
);
11831238
}
11841239

@@ -1200,7 +1255,18 @@ pub mod tests {
12001255
.with_row_selection_enabled(true)
12011256
.build()
12021257
.unwrap();
1203-
assert_eq!(table_scan.snapshot().snapshot_id(), 3051729675574597004);
1258+
assert_eq!(
1259+
table_scan.snapshot().unwrap().snapshot_id(),
1260+
3051729675574597004
1261+
);
1262+
}
1263+
1264+
#[tokio::test]
1265+
async fn test_plan_files_on_table_without_any_snapshots() {
1266+
let table = TableTestFixture::new_empty().table;
1267+
let batch_stream = table.scan().build().unwrap().to_arrow().await.unwrap();
1268+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1269+
assert!(batches.is_empty());
12041270
}
12051271

12061272
#[tokio::test]
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
{
2+
"format-version": 2,
3+
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
4+
"location": "{{ table_location }}",
5+
"last-sequence-number": 34,
6+
"last-updated-ms": 1602638573590,
7+
"last-column-id": 3,
8+
"current-schema-id": 1,
9+
"schemas": [
10+
{
11+
"type": "struct",
12+
"schema-id": 0,
13+
"fields": [
14+
{"id": 1, "name": "x", "required": true, "type": "long"}
15+
]},
16+
{
17+
"type": "struct",
18+
"schema-id": 1,
19+
"identifier-field-ids": [1, 2],
20+
"fields": [
21+
{"id": 1, "name": "x", "required": true, "type": "long"},
22+
{"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"},
23+
{"id": 3, "name": "z", "required": true, "type": "long"},
24+
{"id": 4, "name": "a", "required": true, "type": "string"},
25+
{"id": 5, "name": "dbl", "required": true, "type": "double"},
26+
{"id": 6, "name": "i32", "required": true, "type": "int"},
27+
{"id": 7, "name": "i64", "required": true, "type": "long"},
28+
{"id": 8, "name": "bool", "required": true, "type": "boolean"}
29+
]
30+
}
31+
],
32+
"default-spec-id": 0,
33+
"partition-specs": [
34+
{
35+
"spec-id": 0,
36+
"fields": [
37+
{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}
38+
]
39+
}
40+
],
41+
"last-partition-id": 1000,
42+
"default-sort-order-id": 3,
43+
"sort-orders": [
44+
{
45+
"order-id": 3,
46+
"fields": [
47+
{"transform": "identity", "source-id": 2, "direction": "asc", "null-order": "nulls-first"},
48+
{"transform": "bucket[4]", "source-id": 3, "direction": "desc", "null-order": "nulls-last"}
49+
]
50+
}
51+
],
52+
"properties": {"read.split.target.size": "134217728"},
53+
"snapshots": [],
54+
"snapshot-log": [],
55+
"metadata-log": [{"metadata-file": "{{ table_metadata_1_location }}", "timestamp-ms": 1515100}],
56+
"refs": {}
57+
}

0 commit comments

Comments
 (0)