Skip to content

Commit 50cd2ab

Browse files
authored
feat: add bulk insertion to deletion vector (#1578)
## What changes are included in this PR? In this PR, I added a bulk insertion API to deletion vector and roaring bitmap. Context: - I'm working on iceberg-related feature on daily basis, and I'm implementing own deletion vector and puffin blob myself + Code reference: https://github.com/Mooncake-Labs/moonlink/blob/main/src/moonlink/src/storage/iceberg/deletion_vector.rs - I would like to leverage upstream's implementation to reduce re-inventing the wheels, then I noticed some differences + My impl supports bulk insertion, because `append` provides better perf + In my use case, all deleted rows are fetched in ascending order ## Are these changes tested? Yes, unit tests added.
1 parent 299cfed commit 50cd2ab

File tree

1 file changed

+80
-1
lines changed

1 file changed

+80
-1
lines changed

crates/iceberg/src/delete_vector.rs

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,16 @@ use roaring::RoaringTreemap;
2121
use roaring::bitmap::Iter;
2222
use roaring::treemap::BitmapIter;
2323

24+
use crate::{Error, ErrorKind, Result};
25+
2426
#[derive(Debug, Default)]
2527
pub struct DeleteVector {
2628
inner: RoaringTreemap,
2729
}
2830

2931
impl DeleteVector {
3032
#[allow(unused)]
31-
pub(crate) fn new(roaring_treemap: RoaringTreemap) -> DeleteVector {
33+
pub fn new(roaring_treemap: RoaringTreemap) -> DeleteVector {
3234
DeleteVector {
3335
inner: roaring_treemap,
3436
}
@@ -43,6 +45,25 @@ impl DeleteVector {
4345
self.inner.insert(pos)
4446
}
4547

48+
/// Marks the given `positions` as deleted and returns the number of elements appended.
49+
///
50+
/// The input slice must be strictly ordered in ascending order, and every value must be greater than all existing values already in the set.
51+
///
52+
/// # Errors
53+
///
54+
/// Returns an error if the precondition is not met.
55+
#[allow(dead_code)]
56+
pub fn insert_positions(&mut self, positions: &[u64]) -> Result<usize> {
57+
if let Err(err) = self.inner.append(positions.iter().copied()) {
58+
return Err(Error::new(
59+
ErrorKind::PreconditionFailed,
60+
"failed to marks rows as deleted".to_string(),
61+
)
62+
.with_source(err));
63+
}
64+
Ok(positions.len())
65+
}
66+
4667
#[allow(unused)]
4768
pub fn len(&self) -> u64 {
4869
self.inner.len()
@@ -120,3 +141,61 @@ impl BitOrAssign for DeleteVector {
120141
self.inner.bitor_assign(&other.inner);
121142
}
122143
}
144+
145+
#[cfg(test)]
146+
mod tests {
147+
use super::*;
148+
149+
#[test]
150+
fn test_insertion_and_iteration() {
151+
let mut dv = DeleteVector::default();
152+
assert!(dv.insert(42));
153+
assert!(dv.insert(100));
154+
assert!(!dv.insert(42));
155+
156+
let mut items: Vec<u64> = dv.iter().collect();
157+
items.sort();
158+
assert_eq!(items, vec![42, 100]);
159+
assert_eq!(dv.len(), 2);
160+
}
161+
162+
#[test]
163+
fn test_successful_insert_positions() {
164+
let mut dv = DeleteVector::default();
165+
let positions = vec![1, 2, 3, 1000, 1 << 33];
166+
assert_eq!(dv.insert_positions(&positions).unwrap(), 5);
167+
168+
let mut collected: Vec<u64> = dv.iter().collect();
169+
collected.sort();
170+
assert_eq!(collected, positions);
171+
}
172+
173+
/// Testing scenario: bulk insertion fails because input positions are not strictly increasing.
174+
#[test]
175+
fn test_failed_insertion_unsorted_elements() {
176+
let mut dv = DeleteVector::default();
177+
let positions = vec![1, 3, 5, 4];
178+
let res = dv.insert_positions(&positions);
179+
assert!(res.is_err());
180+
}
181+
182+
/// Testing scenario: bulk insertion fails because input positions have intersection with existing ones.
183+
#[test]
184+
fn test_failed_insertion_with_intersection() {
185+
let mut dv = DeleteVector::default();
186+
let positions = vec![1, 3, 5];
187+
assert_eq!(dv.insert_positions(&positions).unwrap(), 3);
188+
189+
let res = dv.insert_positions(&[2, 4]);
190+
assert!(res.is_err());
191+
}
192+
193+
/// Testing scenario: bulk insertion fails because input positions have duplicates.
194+
#[test]
195+
fn test_failed_insertion_duplicate_elements() {
196+
let mut dv = DeleteVector::default();
197+
let positions = vec![1, 3, 5, 5];
198+
let res = dv.insert_positions(&positions);
199+
assert!(res.is_err());
200+
}
201+
}

0 commit comments

Comments
 (0)