Skip to content

Commit 16906c1

Browse files
fix: stack overflow when loading large equality deletes (#1915)
## Which issue does this PR close? - Closes #. ## What changes are included in this PR? A stack overflow occurs when processing data files containing a large number of equality deletes (e.g., > 6000 rows). This happens because parse_equality_deletes_record_batch_stream previously constructed the final predicate by linearly calling .and() in a loop: ```rust result_predicate = result_predicate.and(row_predicate.not()); ``` This resulted in a deeply nested, left-skewed tree structure with a depth equal to the number of rows (N). When rewrite_not() (which uses a recursive visitor pattern) was subsequently called on this structure, or when the structure was dropped, the call stack limit was exceeded. Changes 1. Balanced Tree Construction: Refactored the predicate combination logic. Instead of linear accumulation, row predicates are collected and combined using a pairwise combination approach to build a balanced tree. This reduces the tree depth from O(N) to O(log N). 2. Early Rewrite: rewrite_not() is now called immediately on each individual row predicate before they are combined. This ensures we are combining simplified predicates and avoids traversing a massive unoptimized tree later. 3. Regression Test: Added test_large_equality_delete_batch_stack_overflow, which processes 20,000 equality delete rows to verify the fix. ## Are these changes tested? - [x] New regression test test_large_equality_delete_batch_stack_overflow passed. - [x] All existing tests in arrow::caching_delete_file_loader passed. Co-authored-by: Renjie Liu <[email protected]>
1 parent 58bdb9f commit 16906c1

File tree

1 file changed

+69
-3
lines changed

1 file changed

+69
-3
lines changed

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ impl CachingDeleteFileLoader {
330330
mut stream: ArrowRecordBatchStream,
331331
equality_ids: HashSet<i32>,
332332
) -> Result<Predicate> {
333-
let mut result_predicate = AlwaysTrue;
333+
let mut row_predicates = Vec::new();
334334
let mut batch_schema_iceberg: Option<Schema> = None;
335335
let accessor = EqDelRecordBatchPartnerAccessor;
336336

@@ -374,10 +374,29 @@ impl CachingDeleteFileLoader {
374374
row_predicate = row_predicate.and(cell_predicate)
375375
}
376376
}
377-
result_predicate = result_predicate.and(row_predicate.not());
377+
row_predicates.push(row_predicate.not().rewrite_not());
378378
}
379379
}
380-
Ok(result_predicate.rewrite_not())
380+
381+
// All row predicates are combined to a single predicate by creating a balanced binary tree.
382+
// Using a simple fold would result in a deeply nested predicate that can cause a stack overflow.
383+
while row_predicates.len() > 1 {
384+
let mut next_level = Vec::with_capacity(row_predicates.len().div_ceil(2));
385+
let mut iter = row_predicates.into_iter();
386+
while let Some(p1) = iter.next() {
387+
if let Some(p2) = iter.next() {
388+
next_level.push(p1.and(p2));
389+
} else {
390+
next_level.push(p1);
391+
}
392+
}
393+
row_predicates = next_level;
394+
}
395+
396+
match row_predicates.pop() {
397+
Some(p) => Ok(p),
398+
None => Ok(AlwaysTrue),
399+
}
381400
}
382401
}
383402

@@ -912,4 +931,51 @@ mod tests {
912931
result.err()
913932
);
914933
}
934+
935+
#[tokio::test]
936+
async fn test_large_equality_delete_batch_stack_overflow() {
937+
let tmp_dir = TempDir::new().unwrap();
938+
let table_location = tmp_dir.path().as_os_str().to_str().unwrap();
939+
let file_io = FileIO::from_path(table_location).unwrap().build().unwrap();
940+
941+
// Create a large batch of equality deletes
942+
let num_rows = 20_000;
943+
let col_y_vals: Vec<i64> = (0..num_rows).collect();
944+
let col_y = Arc::new(Int64Array::from(col_y_vals)) as ArrayRef;
945+
946+
let schema = Arc::new(arrow_schema::Schema::new(vec![
947+
Field::new("y", arrow_schema::DataType::Int64, false).with_metadata(HashMap::from([(
948+
PARQUET_FIELD_ID_META_KEY.to_string(),
949+
"2".to_string(),
950+
)])),
951+
]));
952+
953+
let record_batch = RecordBatch::try_new(schema.clone(), vec![col_y]).unwrap();
954+
955+
// Write to file
956+
let path = format!("{}/large-eq-deletes.parquet", &table_location);
957+
let file = File::create(&path).unwrap();
958+
let props = WriterProperties::builder()
959+
.set_compression(Compression::SNAPPY)
960+
.build();
961+
let mut writer = ArrowWriter::try_new(file, schema, Some(props)).unwrap();
962+
writer.write(&record_batch).unwrap();
963+
writer.close().unwrap();
964+
965+
let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
966+
let record_batch_stream = basic_delete_file_loader
967+
.parquet_to_batch_stream(&path)
968+
.await
969+
.expect("could not get batch stream");
970+
971+
let eq_ids = HashSet::from_iter(vec![2]);
972+
973+
let result = CachingDeleteFileLoader::parse_equality_deletes_record_batch_stream(
974+
record_batch_stream,
975+
eq_ids,
976+
)
977+
.await;
978+
979+
assert!(result.is_ok());
980+
}
915981
}

0 commit comments

Comments
 (0)