Skip to content

Commit 157891a

Browse files
committed
add test
Signed-off-by: xxchan <[email protected]>
1 parent 3ac5b70 commit 157891a

File tree

2 files changed

+278
-9
lines changed

2 files changed

+278
-9
lines changed

crates/iceberg/src/scan/context.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -250,9 +250,11 @@ impl PlanContext {
250250
matches!(entry.status(), ManifestStatus::Added)
251251
&& matches!(entry.data_file().content_type(), DataContentType::Data)
252252
&& (
253-
// Is it possible that the snapshot id here is not contained?
254-
entry.snapshot_id().is_none()
255-
|| snapshot_ids.contains(&entry.snapshot_id().unwrap())
253+
entry
254+
.snapshot_id()
255+
.map(|id| snapshot_ids.contains(&id))
256+
.unwrap_or(true)
257+
// Include entries without snapshot_id
256258
)
257259
}));
258260

@@ -273,7 +275,10 @@ impl PlanContext {
273275
};
274276
(Some(delete_file_idx.clone()), tx.clone())
275277
} else {
276-
(delete_file_idx_and_tx.as_ref().map(|(idx, _)| idx.clone()), tx_data.clone())
278+
(
279+
delete_file_idx_and_tx.as_ref().map(|(idx, _)| idx.clone()),
280+
tx_data.clone(),
281+
)
277282
};
278283

279284
let partition_bound_predicate = if self.predicate.is_some() {

crates/iceberg/src/scan/mod.rs

Lines changed: 269 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,12 +137,18 @@ impl<'a> TableScanBuilder<'a> {
137137
}
138138

139139
/// Set the starting snapshot id (exclusive) for incremental scan.
140+
///
141+
/// # Behavior
142+
/// - Only includes files from Append and Overwrite operations
143+
/// - Excludes Replace operations (e.g., compaction)
144+
/// - Only returns Added manifest entries with Data content
145+
/// - Delete files are not supported in incremental scans
140146
pub fn from_snapshot_id(mut self, from_snapshot_id: i64) -> Self {
141147
self.from_snapshot_id = Some(from_snapshot_id);
142148
self
143149
}
144150

145-
/// Set the ending snapshot id (inclusive) for incremental scan.
151+
/// Set the ending snapshot id (inclusive) for incremental scan (See [`Self::from_snapshot_id`]).
146152
pub fn to_snapshot_id(mut self, to_snapshot_id: i64) -> Self {
147153
self.to_snapshot_id = Some(to_snapshot_id);
148154
self
@@ -209,7 +215,7 @@ impl<'a> TableScanBuilder<'a> {
209215
if self.to_snapshot_id.is_none() {
210216
return Err(Error::new(
211217
ErrorKind::DataInvalid,
212-
"Incremental scan requires to_snapshot_id to be set",
218+
"Incremental scan requires to_snapshot_id to be set. Use from_snapshot_id (exclusive) and to_snapshot_id (inclusive) to specify the range.",
213219
));
214220
}
215221

@@ -221,7 +227,39 @@ impl<'a> TableScanBuilder<'a> {
221227
));
222228
}
223229

230+
let to_snapshot_id = self.to_snapshot_id.unwrap();
231+
232+
// Validate to_snapshot_id exists
233+
if self
234+
.table
235+
.metadata()
236+
.snapshot_by_id(to_snapshot_id)
237+
.is_none()
238+
{
239+
return Err(Error::new(
240+
ErrorKind::DataInvalid,
241+
format!("to_snapshot_id {} not found", to_snapshot_id),
242+
));
243+
}
244+
245+
// Validate from_snapshot_id if provided
246+
if let Some(from_id) = self.from_snapshot_id {
247+
// Validate from_snapshot_id exists
248+
if self.table.metadata().snapshot_by_id(from_id).is_none() {
249+
return Err(Error::new(
250+
ErrorKind::DataInvalid,
251+
format!("from_snapshot_id {} not found", from_id),
252+
));
253+
}
224254

255+
// Validate snapshot order
256+
if to_snapshot_id <= from_id {
257+
return Err(Error::new(
258+
ErrorKind::DataInvalid,
259+
"to_snapshot_id must be greater than from_snapshot_id",
260+
));
261+
}
262+
}
225263
}
226264

227265
let snapshot = match self.snapshot_id {
@@ -388,8 +426,13 @@ impl TableScan {
388426
// used to stream the results back to the caller
389427
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
390428

391-
let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
392-
let delete_file_idx_and_tx = Some((delete_file_idx.clone(), delete_file_tx.clone()));
429+
// For incremental scan, disable delete file processing
430+
let delete_file_idx_and_tx = if plan_context.to_snapshot_id.is_some() {
431+
None
432+
} else {
433+
let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
434+
Some((delete_file_idx.clone(), delete_file_tx.clone()))
435+
};
393436

394437
// get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
395438
// whose partitions cannot match this
@@ -429,7 +472,8 @@ impl TableScan {
429472
concurrency_limit_manifest_entries,
430473
|(manifest_entry_context, tx)| async move {
431474
spawn(async move {
432-
Self::process_delete_manifest_entry(manifest_entry_context, tx).await
475+
Self::process_delete_manifest_entry(manifest_entry_context, tx)
476+
.await
433477
})
434478
.await
435479
},
@@ -1844,4 +1888,224 @@ pub mod tests {
18441888
};
18451889
test_fn(task);
18461890
}
1891+
1892+
#[tokio::test]
1893+
async fn test_incremental_scan() {
1894+
let mut fixture = TableTestFixture::new();
1895+
fixture.setup_manifest_files().await;
1896+
1897+
// Get the two snapshots in the table
1898+
let snapshots = fixture.table.metadata().snapshots().collect::<Vec<_>>();
1899+
1900+
assert_eq!(snapshots.len(), 2, "Test fixture should have two snapshots");
1901+
1902+
// First snapshot is the parent of the second
1903+
let first_snapshot_id = snapshots[0].snapshot_id();
1904+
let second_snapshot_id = snapshots[1].snapshot_id();
1905+
1906+
// Create an incremental scan from first to second snapshot
1907+
let table_scan = fixture
1908+
.table
1909+
.scan()
1910+
.from_snapshot_id(first_snapshot_id)
1911+
.to_snapshot_id(second_snapshot_id)
1912+
.build()
1913+
.unwrap();
1914+
1915+
// Plan files and verify we get the expected files
1916+
let tasks = table_scan
1917+
.plan_files()
1918+
.await
1919+
.unwrap()
1920+
.try_fold(vec![], |mut acc, task| async move {
1921+
acc.push(task);
1922+
Ok(acc)
1923+
})
1924+
.await
1925+
.unwrap();
1926+
1927+
// Only files added between first and second snapshot should be included
1928+
// The way our test fixture is set up, the added files should be in the second snapshot
1929+
// For our test fixture, only one file should be returned by incremental scan
1930+
assert_eq!(
1931+
tasks.len(),
1932+
1,
1933+
"Incremental scan should return only added files between snapshots"
1934+
);
1935+
1936+
// Verify this is the expected file (file 1.parquet which was added in the second snapshot)
1937+
assert_eq!(
1938+
tasks[0].data_file_path,
1939+
format!("{}/1.parquet", &fixture.table_location),
1940+
"Incremental scan should return the file added in the second snapshot"
1941+
);
1942+
1943+
// Verify we can read the data
1944+
let batch_stream = table_scan.to_arrow().await.unwrap();
1945+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1946+
1947+
// Verify data contents from 1.parquet
1948+
assert_eq!(batches.len(), 1, "Should have one record batch");
1949+
assert_eq!(batches[0].num_rows(), 1024, "Should have 1024 rows");
1950+
1951+
// Verify content of some columns
1952+
let col_x = batches[0].column_by_name("x").unwrap();
1953+
let int64_arr = col_x.as_any().downcast_ref::<Int64Array>().unwrap();
1954+
assert_eq!(
1955+
int64_arr.value(0),
1956+
1,
1957+
"First value of column 'x' should be 1"
1958+
);
1959+
1960+
let col_a = batches[0].column_by_name("a").unwrap();
1961+
let string_arr = col_a.as_any().downcast_ref::<StringArray>().unwrap();
1962+
assert!(
1963+
string_arr.value(0) == "Apache" || string_arr.value(0) == "Iceberg",
1964+
"First value of column 'a' should be either 'Apache' or 'Iceberg'"
1965+
);
1966+
}
1967+
1968+
#[test]
1969+
fn test_invalid_incremental_scan_configurations() {
1970+
let table = TableTestFixture::new().table;
1971+
1972+
// Test case 1: to_snapshot_id is required for incremental scan
1973+
let result = table
1974+
.scan()
1975+
.from_snapshot_id(1234) // Only providing from_snapshot_id
1976+
.build();
1977+
1978+
assert!(
1979+
result.is_err(),
1980+
"Should fail when to_snapshot_id is not set"
1981+
);
1982+
assert_eq!(
1983+
result.unwrap_err().to_string(),
1984+
"DataInvalid => Incremental scan requires to_snapshot_id to be set. Use from_snapshot_id (exclusive) and to_snapshot_id (inclusive) to specify the range.",
1985+
"Should have correct error message for missing to_snapshot_id"
1986+
);
1987+
1988+
// Test case 2: snapshot_id should not be set with incremental scan
1989+
let result = table
1990+
.scan()
1991+
.snapshot_id(1234)
1992+
.from_snapshot_id(1234)
1993+
.to_snapshot_id(5678)
1994+
.build();
1995+
1996+
assert!(
1997+
result.is_err(),
1998+
"Should fail when snapshot_id is set with incremental scan"
1999+
);
2000+
assert_eq!(
2001+
result.unwrap_err().to_string(),
2002+
"DataInvalid => snapshot_id should not be set for incremental scan. Use from_snapshot_id and to_snapshot_id instead.",
2003+
"Should have correct error message for setting both snapshot_id and incremental scan parameters"
2004+
);
2005+
}
2006+
2007+
#[test]
2008+
fn test_incremental_scan_edge_cases() {
2009+
let fixture = TableTestFixture::new();
2010+
let table = &fixture.table;
2011+
2012+
// Test case 1: Non-existent to_snapshot_id
2013+
let result = table
2014+
.scan()
2015+
.to_snapshot_id(999999) // Non-existent snapshot ID
2016+
.build();
2017+
2018+
assert!(
2019+
result.is_err(),
2020+
"Should fail when to_snapshot_id does not exist"
2021+
);
2022+
assert_eq!(
2023+
result.unwrap_err().to_string(),
2024+
"DataInvalid => to_snapshot_id 999999 not found",
2025+
"Should have correct error message for non-existent to_snapshot_id"
2026+
);
2027+
2028+
// Test case 2: Non-existent from_snapshot_id
2029+
let result = table
2030+
.scan()
2031+
.from_snapshot_id(999998) // Non-existent snapshot ID
2032+
.to_snapshot_id(999999) // Non-existent snapshot ID
2033+
.build();
2034+
2035+
assert!(
2036+
result.is_err(),
2037+
"Should fail when to_snapshot_id does not exist"
2038+
);
2039+
// This should fail on to_snapshot_id first since that's checked first
2040+
assert_eq!(
2041+
result.unwrap_err().to_string(),
2042+
"DataInvalid => to_snapshot_id 999999 not found",
2043+
"Should fail on to_snapshot_id check first"
2044+
);
2045+
2046+
// We need to set up snapshots for the remaining tests
2047+
let snapshots = table.metadata().snapshots().collect::<Vec<_>>();
2048+
if snapshots.len() >= 2 {
2049+
let first_snapshot_id = snapshots[0].snapshot_id();
2050+
let second_snapshot_id = snapshots[1].snapshot_id();
2051+
2052+
// Test case 3: from_snapshot_id doesn't exist but to_snapshot_id does
2053+
let result = table
2054+
.scan()
2055+
.from_snapshot_id(999998) // Non-existent
2056+
.to_snapshot_id(second_snapshot_id) // Existent
2057+
.build();
2058+
2059+
assert!(
2060+
result.is_err(),
2061+
"Should fail when from_snapshot_id does not exist"
2062+
);
2063+
assert_eq!(
2064+
result.unwrap_err().to_string(),
2065+
"DataInvalid => from_snapshot_id 999998 not found",
2066+
"Should have correct error message for non-existent from_snapshot_id"
2067+
);
2068+
2069+
// Determine which snapshot is newer based on snapshot IDs
2070+
let (older_snapshot_id, newer_snapshot_id) = if first_snapshot_id < second_snapshot_id {
2071+
(first_snapshot_id, second_snapshot_id)
2072+
} else {
2073+
(second_snapshot_id, first_snapshot_id)
2074+
};
2075+
2076+
// Test case 4: Reversed snapshot order (to_snapshot_id <= from_snapshot_id)
2077+
let result = table
2078+
.scan()
2079+
.from_snapshot_id(newer_snapshot_id) // Later snapshot
2080+
.to_snapshot_id(older_snapshot_id) // Earlier snapshot
2081+
.build();
2082+
2083+
assert!(
2084+
result.is_err(),
2085+
"Should fail when to_snapshot_id <= from_snapshot_id"
2086+
);
2087+
assert_eq!(
2088+
result.unwrap_err().to_string(),
2089+
"DataInvalid => to_snapshot_id must be greater than from_snapshot_id",
2090+
"Should have correct error message for reversed snapshot order"
2091+
);
2092+
2093+
// Test case 5: Equal snapshot IDs (empty range)
2094+
let result = table
2095+
.scan()
2096+
.from_snapshot_id(older_snapshot_id)
2097+
.to_snapshot_id(older_snapshot_id)
2098+
.build();
2099+
2100+
assert!(
2101+
result.is_err(),
2102+
"Should fail when from_snapshot_id == to_snapshot_id"
2103+
);
2104+
assert_eq!(
2105+
result.unwrap_err().to_string(),
2106+
"DataInvalid => to_snapshot_id must be greater than from_snapshot_id",
2107+
"Should have correct error message for equal snapshot IDs"
2108+
);
2109+
}
2110+
}
18472111
}

0 commit comments

Comments
 (0)