Skip to content

Commit dfc33d3

Browse files
committed
add test
Signed-off-by: xxchan <[email protected]>
1 parent 1099714 commit dfc33d3

File tree

2 files changed

+278
-9
lines changed

2 files changed

+278
-9
lines changed

crates/iceberg/src/scan/context.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -251,9 +251,11 @@ impl PlanContext {
251251
matches!(entry.status(), ManifestStatus::Added)
252252
&& matches!(entry.data_file().content_type(), DataContentType::Data)
253253
&& (
254-
// Is it possible that the snapshot id here is not contained?
255-
entry.snapshot_id().is_none()
256-
|| snapshot_ids.contains(&entry.snapshot_id().unwrap())
254+
entry
255+
.snapshot_id()
256+
.map(|id| snapshot_ids.contains(&id))
257+
.unwrap_or(true)
258+
// Include entries without snapshot_id
257259
)
258260
}));
259261

@@ -274,7 +276,10 @@ impl PlanContext {
274276
};
275277
(Some(delete_file_idx.clone()), tx.clone())
276278
} else {
277-
(delete_file_idx_and_tx.as_ref().map(|(idx, _)| idx.clone()), tx_data.clone())
279+
(
280+
delete_file_idx_and_tx.as_ref().map(|(idx, _)| idx.clone()),
281+
tx_data.clone(),
282+
)
278283
};
279284

280285
let partition_bound_predicate = if self.predicate.is_some() {

crates/iceberg/src/scan/mod.rs

Lines changed: 269 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,12 +137,18 @@ impl<'a> TableScanBuilder<'a> {
137137
}
138138

139139
/// Set the starting snapshot id (exclusive) for incremental scan.
140+
///
141+
/// # Behavior
142+
/// - Only includes files from Append and Overwrite operations
143+
/// - Excludes Replace operations (e.g., compaction)
144+
/// - Only returns Added manifest entries with Data content
145+
/// - Delete files are not supported in incremental scans
140146
pub fn from_snapshot_id(mut self, from_snapshot_id: i64) -> Self {
141147
self.from_snapshot_id = Some(from_snapshot_id);
142148
self
143149
}
144150

145-
/// Set the ending snapshot id (inclusive) for incremental scan.
151+
/// Set the ending snapshot id (inclusive) for incremental scan (See [`Self::from_snapshot_id`]).
146152
pub fn to_snapshot_id(mut self, to_snapshot_id: i64) -> Self {
147153
self.to_snapshot_id = Some(to_snapshot_id);
148154
self
@@ -209,7 +215,7 @@ impl<'a> TableScanBuilder<'a> {
209215
if self.to_snapshot_id.is_none() {
210216
return Err(Error::new(
211217
ErrorKind::DataInvalid,
212-
"Incremental scan requires to_snapshot_id to be set",
218+
"Incremental scan requires to_snapshot_id to be set. Use from_snapshot_id (exclusive) and to_snapshot_id (inclusive) to specify the range.",
213219
));
214220
}
215221

@@ -221,7 +227,39 @@ impl<'a> TableScanBuilder<'a> {
221227
));
222228
}
223229

230+
let to_snapshot_id = self.to_snapshot_id.unwrap();
231+
232+
// Validate to_snapshot_id exists
233+
if self
234+
.table
235+
.metadata()
236+
.snapshot_by_id(to_snapshot_id)
237+
.is_none()
238+
{
239+
return Err(Error::new(
240+
ErrorKind::DataInvalid,
241+
format!("to_snapshot_id {} not found", to_snapshot_id),
242+
));
243+
}
244+
245+
// Validate from_snapshot_id if provided
246+
if let Some(from_id) = self.from_snapshot_id {
247+
// Validate from_snapshot_id exists
248+
if self.table.metadata().snapshot_by_id(from_id).is_none() {
249+
return Err(Error::new(
250+
ErrorKind::DataInvalid,
251+
format!("from_snapshot_id {} not found", from_id),
252+
));
253+
}
224254

255+
// Validate snapshot order
256+
if to_snapshot_id <= from_id {
257+
return Err(Error::new(
258+
ErrorKind::DataInvalid,
259+
"to_snapshot_id must be greater than from_snapshot_id",
260+
));
261+
}
262+
}
225263
}
226264

227265
let snapshot = match self.snapshot_id {
@@ -388,8 +426,13 @@ impl TableScan {
388426
// used to stream the results back to the caller
389427
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
390428

391-
let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
392-
let delete_file_idx_and_tx = Some((delete_file_idx.clone(), delete_file_tx.clone()));
429+
// For incremental scan, disable delete file processing
430+
let delete_file_idx_and_tx = if plan_context.to_snapshot_id.is_some() {
431+
None
432+
} else {
433+
let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
434+
Some((delete_file_idx.clone(), delete_file_tx.clone()))
435+
};
393436

394437
// get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
395438
// whose partitions cannot match this
@@ -429,7 +472,8 @@ impl TableScan {
429472
concurrency_limit_manifest_entries,
430473
|(manifest_entry_context, tx)| async move {
431474
spawn(async move {
432-
Self::process_delete_manifest_entry(manifest_entry_context, tx).await
475+
Self::process_delete_manifest_entry(manifest_entry_context, tx)
476+
.await
433477
})
434478
.await
435479
},
@@ -1846,4 +1890,224 @@ pub mod tests {
18461890
};
18471891
test_fn(task);
18481892
}
1893+
1894+
#[tokio::test]
1895+
async fn test_incremental_scan() {
1896+
let mut fixture = TableTestFixture::new();
1897+
fixture.setup_manifest_files().await;
1898+
1899+
// Get the two snapshots in the table
1900+
let snapshots = fixture.table.metadata().snapshots().collect::<Vec<_>>();
1901+
1902+
assert_eq!(snapshots.len(), 2, "Test fixture should have two snapshots");
1903+
1904+
// First snapshot is the parent of the second
1905+
let first_snapshot_id = snapshots[0].snapshot_id();
1906+
let second_snapshot_id = snapshots[1].snapshot_id();
1907+
1908+
// Create an incremental scan from first to second snapshot
1909+
let table_scan = fixture
1910+
.table
1911+
.scan()
1912+
.from_snapshot_id(first_snapshot_id)
1913+
.to_snapshot_id(second_snapshot_id)
1914+
.build()
1915+
.unwrap();
1916+
1917+
// Plan files and verify we get the expected files
1918+
let tasks = table_scan
1919+
.plan_files()
1920+
.await
1921+
.unwrap()
1922+
.try_fold(vec![], |mut acc, task| async move {
1923+
acc.push(task);
1924+
Ok(acc)
1925+
})
1926+
.await
1927+
.unwrap();
1928+
1929+
// Only files added between first and second snapshot should be included
1930+
// The way our test fixture is set up, the added files should be in the second snapshot
1931+
// For our test fixture, only one file should be returned by incremental scan
1932+
assert_eq!(
1933+
tasks.len(),
1934+
1,
1935+
"Incremental scan should return only added files between snapshots"
1936+
);
1937+
1938+
// Verify this is the expected file (file 1.parquet which was added in the second snapshot)
1939+
assert_eq!(
1940+
tasks[0].data_file_path,
1941+
format!("{}/1.parquet", &fixture.table_location),
1942+
"Incremental scan should return the file added in the second snapshot"
1943+
);
1944+
1945+
// Verify we can read the data
1946+
let batch_stream = table_scan.to_arrow().await.unwrap();
1947+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
1948+
1949+
// Verify data contents from 1.parquet
1950+
assert_eq!(batches.len(), 1, "Should have one record batch");
1951+
assert_eq!(batches[0].num_rows(), 1024, "Should have 1024 rows");
1952+
1953+
// Verify content of some columns
1954+
let col_x = batches[0].column_by_name("x").unwrap();
1955+
let int64_arr = col_x.as_any().downcast_ref::<Int64Array>().unwrap();
1956+
assert_eq!(
1957+
int64_arr.value(0),
1958+
1,
1959+
"First value of column 'x' should be 1"
1960+
);
1961+
1962+
let col_a = batches[0].column_by_name("a").unwrap();
1963+
let string_arr = col_a.as_any().downcast_ref::<StringArray>().unwrap();
1964+
assert!(
1965+
string_arr.value(0) == "Apache" || string_arr.value(0) == "Iceberg",
1966+
"First value of column 'a' should be either 'Apache' or 'Iceberg'"
1967+
);
1968+
}
1969+
1970+
#[test]
1971+
fn test_invalid_incremental_scan_configurations() {
1972+
let table = TableTestFixture::new().table;
1973+
1974+
// Test case 1: to_snapshot_id is required for incremental scan
1975+
let result = table
1976+
.scan()
1977+
.from_snapshot_id(1234) // Only providing from_snapshot_id
1978+
.build();
1979+
1980+
assert!(
1981+
result.is_err(),
1982+
"Should fail when to_snapshot_id is not set"
1983+
);
1984+
assert_eq!(
1985+
result.unwrap_err().to_string(),
1986+
"DataInvalid => Incremental scan requires to_snapshot_id to be set. Use from_snapshot_id (exclusive) and to_snapshot_id (inclusive) to specify the range.",
1987+
"Should have correct error message for missing to_snapshot_id"
1988+
);
1989+
1990+
// Test case 2: snapshot_id should not be set with incremental scan
1991+
let result = table
1992+
.scan()
1993+
.snapshot_id(1234)
1994+
.from_snapshot_id(1234)
1995+
.to_snapshot_id(5678)
1996+
.build();
1997+
1998+
assert!(
1999+
result.is_err(),
2000+
"Should fail when snapshot_id is set with incremental scan"
2001+
);
2002+
assert_eq!(
2003+
result.unwrap_err().to_string(),
2004+
"DataInvalid => snapshot_id should not be set for incremental scan. Use from_snapshot_id and to_snapshot_id instead.",
2005+
"Should have correct error message for setting both snapshot_id and incremental scan parameters"
2006+
);
2007+
}
2008+
2009+
#[test]
2010+
fn test_incremental_scan_edge_cases() {
2011+
let fixture = TableTestFixture::new();
2012+
let table = &fixture.table;
2013+
2014+
// Test case 1: Non-existent to_snapshot_id
2015+
let result = table
2016+
.scan()
2017+
.to_snapshot_id(999999) // Non-existent snapshot ID
2018+
.build();
2019+
2020+
assert!(
2021+
result.is_err(),
2022+
"Should fail when to_snapshot_id does not exist"
2023+
);
2024+
assert_eq!(
2025+
result.unwrap_err().to_string(),
2026+
"DataInvalid => to_snapshot_id 999999 not found",
2027+
"Should have correct error message for non-existent to_snapshot_id"
2028+
);
2029+
2030+
// Test case 2: Non-existent from_snapshot_id
2031+
let result = table
2032+
.scan()
2033+
.from_snapshot_id(999998) // Non-existent snapshot ID
2034+
.to_snapshot_id(999999) // Non-existent snapshot ID
2035+
.build();
2036+
2037+
assert!(
2038+
result.is_err(),
2039+
"Should fail when to_snapshot_id does not exist"
2040+
);
2041+
// This should fail on to_snapshot_id first since that's checked first
2042+
assert_eq!(
2043+
result.unwrap_err().to_string(),
2044+
"DataInvalid => to_snapshot_id 999999 not found",
2045+
"Should fail on to_snapshot_id check first"
2046+
);
2047+
2048+
// We need to set up snapshots for the remaining tests
2049+
let snapshots = table.metadata().snapshots().collect::<Vec<_>>();
2050+
if snapshots.len() >= 2 {
2051+
let first_snapshot_id = snapshots[0].snapshot_id();
2052+
let second_snapshot_id = snapshots[1].snapshot_id();
2053+
2054+
// Test case 3: from_snapshot_id doesn't exist but to_snapshot_id does
2055+
let result = table
2056+
.scan()
2057+
.from_snapshot_id(999998) // Non-existent
2058+
.to_snapshot_id(second_snapshot_id) // Existent
2059+
.build();
2060+
2061+
assert!(
2062+
result.is_err(),
2063+
"Should fail when from_snapshot_id does not exist"
2064+
);
2065+
assert_eq!(
2066+
result.unwrap_err().to_string(),
2067+
"DataInvalid => from_snapshot_id 999998 not found",
2068+
"Should have correct error message for non-existent from_snapshot_id"
2069+
);
2070+
2071+
// Determine which snapshot is newer based on snapshot IDs
2072+
let (older_snapshot_id, newer_snapshot_id) = if first_snapshot_id < second_snapshot_id {
2073+
(first_snapshot_id, second_snapshot_id)
2074+
} else {
2075+
(second_snapshot_id, first_snapshot_id)
2076+
};
2077+
2078+
// Test case 4: Reversed snapshot order (to_snapshot_id <= from_snapshot_id)
2079+
let result = table
2080+
.scan()
2081+
.from_snapshot_id(newer_snapshot_id) // Later snapshot
2082+
.to_snapshot_id(older_snapshot_id) // Earlier snapshot
2083+
.build();
2084+
2085+
assert!(
2086+
result.is_err(),
2087+
"Should fail when to_snapshot_id <= from_snapshot_id"
2088+
);
2089+
assert_eq!(
2090+
result.unwrap_err().to_string(),
2091+
"DataInvalid => to_snapshot_id must be greater than from_snapshot_id",
2092+
"Should have correct error message for reversed snapshot order"
2093+
);
2094+
2095+
// Test case 5: Equal snapshot IDs (empty range)
2096+
let result = table
2097+
.scan()
2098+
.from_snapshot_id(older_snapshot_id)
2099+
.to_snapshot_id(older_snapshot_id)
2100+
.build();
2101+
2102+
assert!(
2103+
result.is_err(),
2104+
"Should fail when from_snapshot_id == to_snapshot_id"
2105+
);
2106+
assert_eq!(
2107+
result.unwrap_err().to_string(),
2108+
"DataInvalid => to_snapshot_id must be greater than from_snapshot_id",
2109+
"Should have correct error message for equal snapshot IDs"
2110+
);
2111+
}
2112+
}
18492113
}

0 commit comments

Comments
 (0)