Skip to content

Commit 36e5621

Browse files
authored
feat(function): high performance bitmap_count,bitmap_intersect without deserialization (#19149)
1 parent 5346432 commit 36e5621

File tree

9 files changed

+442
-176
lines changed

9 files changed

+442
-176
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ buf-list = "1.0.3"
170170
bumpalo = "3.12.0"
171171
byte-unit = "5.1.6"
172172
bytemuck = { version = "1", features = ["derive", "extern_crate_alloc", "must_cast", "transparentwrapper_extra"] }
173-
byteorder = "1.4.3"
173+
byteorder = "1.5.0"
174174
bytes = "1.5.0"
175175
bytesize = "2"
176176
cbordata = { version = "0.6.0" }

src/common/io/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ edition = { workspace = true }
99
[dependencies]
1010
bincode = { workspace = true }
1111
borsh = { workspace = true }
12+
byteorder = { workspace = true }
1213
bytes = { workspace = true }
1314
chrono = { workspace = true }
1415
chrono-tz = { workspace = true }
@@ -32,6 +33,7 @@ wkt = { workspace = true }
3233

3334
[dev-dependencies]
3435
aho-corasick = { workspace = true }
36+
rand = { workspace = true }
3537
rmp-serde = { workspace = true }
3638

3739
[lints]

src/common/io/src/bitmap.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ use roaring::RoaringTreemap;
2828
use roaring::treemap::Iter;
2929
use smallvec::SmallVec;
3030

31+
mod reader;
32+
3133
// https://github.com/ClickHouse/ClickHouse/blob/516a6ed6f8bd8c5f6eed3a10e9037580b2fb6152/src/AggregateFunctions/AggregateFunctionGroupBitmapData.h#L914
3234
pub const LARGE_THRESHOLD: usize = 32;
3335
pub const HYBRID_MAGIC: [u8; 2] = *b"HB";
@@ -481,7 +483,7 @@ pub fn deserialize_bitmap(buf: &[u8]) -> Result<HybridBitmap> {
481483
return result;
482484
}
483485

484-
RoaringTreemap::deserialize_from(buf)
486+
RoaringTreemap::deserialize_unchecked_from(buf)
485487
.map(HybridBitmap::from)
486488
.map_err(|e| {
487489
let len = buf.len();
@@ -490,6 +492,39 @@ pub fn deserialize_bitmap(buf: &[u8]) -> Result<HybridBitmap> {
490492
})
491493
}
492494

495+
pub fn bitmap_len(buf: &[u8]) -> Result<u64> {
496+
if buf.is_empty() {
497+
return Ok(0);
498+
}
499+
500+
if buf.len() > 3
501+
&& buf[3] == HYBRID_KIND_LARGE
502+
&& buf[..2] == HYBRID_MAGIC
503+
&& buf[2] == HYBRID_VERSION
504+
{
505+
Ok(reader::bitmap_len(&buf[HYBRID_HEADER_LEN..])? as u64)
506+
} else {
507+
Ok(deserialize_bitmap(buf)?.len())
508+
}
509+
}
510+
511+
pub fn intersection_with_serialized(lhs: &mut HybridBitmap, buf: &[u8]) -> Result<()> {
512+
if let HybridBitmap::Large(lhs) = lhs
513+
&& buf.len() > 3
514+
&& buf[3] == HYBRID_KIND_LARGE
515+
&& buf[..2] == HYBRID_MAGIC
516+
&& buf[2] == HYBRID_VERSION
517+
{
518+
Ok(reader::intersection_with_serialized(
519+
lhs,
520+
&buf[HYBRID_HEADER_LEN..],
521+
)?)
522+
} else {
523+
*lhs &= deserialize_bitmap(buf)?;
524+
Ok(())
525+
}
526+
}
527+
493528
fn try_decode_hybrid_bitmap(buf: &[u8]) -> Option<Result<HybridBitmap>> {
494529
if buf.len() < HYBRID_HEADER_LEN {
495530
return None;
@@ -511,7 +546,7 @@ fn try_decode_hybrid_bitmap(buf: &[u8]) -> Option<Result<HybridBitmap>> {
511546
match kind {
512547
HYBRID_KIND_SMALL => Some(decode_small_bitmap(payload)),
513548
HYBRID_KIND_LARGE => Some(
514-
RoaringTreemap::deserialize_from(payload)
549+
RoaringTreemap::deserialize_unchecked_from(payload)
515550
.map(HybridBitmap::from)
516551
.map_err(|e| {
517552
let len = payload.len();

src/common/io/src/bitmap/reader.rs

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::io;
16+
use std::io::Cursor;
17+
use std::io::Error;
18+
use std::io::ErrorKind;
19+
use std::io::Seek;
20+
21+
use byteorder::LittleEndian;
22+
use byteorder::ReadBytesExt;
23+
use roaring::RoaringTreemap;
24+
25+
// Sizes of header structures
26+
const DESCRIPTION_BYTES: usize = 4;
27+
const OFFSET_BYTES: usize = 4;
28+
29+
#[derive(Clone)]
30+
pub struct TreemapReader<'a> {
31+
buf: &'a [u8],
32+
_size: u64,
33+
}
34+
35+
impl<'a> TreemapReader<'a> {
36+
pub fn new(mut buf: &'a [u8]) -> io::Result<Self> {
37+
let size = buf
38+
.read_u64::<LittleEndian>()
39+
.map_err(|_| Error::other("fail to read size"))?;
40+
41+
Ok(Self { buf, _size: size })
42+
}
43+
44+
pub fn iter(&self) -> TreeMapIter<'_> {
45+
TreeMapIter {
46+
buf: self.buf,
47+
offset: 0,
48+
}
49+
}
50+
}
51+
52+
pub struct TreeMapIter<'a> {
53+
buf: &'a [u8],
54+
offset: usize,
55+
}
56+
57+
impl<'a> Iterator for TreeMapIter<'a> {
58+
type Item = io::Result<BitmapReader<'a>>;
59+
60+
fn next(&mut self) -> Option<Self::Item> {
61+
if self.buf.len() == self.offset {
62+
return None;
63+
}
64+
65+
match BitmapReader::decode(&self.buf[self.offset..]) {
66+
Ok(header) => {
67+
self.offset += header.buf.len();
68+
Some(Ok(header))
69+
}
70+
Err(err) => {
71+
self.offset = self.buf.len();
72+
Some(Err(err))
73+
}
74+
}
75+
}
76+
}
77+
78+
#[derive(Debug, Clone)]
79+
pub struct BitmapReader<'a> {
80+
prefix: u32,
81+
containers: u32,
82+
buf: &'a [u8],
83+
}
84+
85+
impl BitmapReader<'_> {
86+
pub fn decode(buf: &[u8]) -> io::Result<BitmapReader<'_>> {
87+
let mut reader = Cursor::new(buf);
88+
let prefix = reader.read_u32::<LittleEndian>()?;
89+
90+
const SERIAL_COOKIE_NO_RUNCONTAINER: u32 = 12346;
91+
const SERIAL_COOKIE: u16 = 12347;
92+
93+
// First read the cookie to determine which version of the format we are reading
94+
let containers = {
95+
let cookie = reader.read_u32::<LittleEndian>()?;
96+
if cookie == SERIAL_COOKIE_NO_RUNCONTAINER {
97+
reader.read_u32::<LittleEndian>()?
98+
} else if (cookie as u16) == SERIAL_COOKIE {
99+
return Err(Error::other("does not support run containers"));
100+
} else {
101+
return Err(Error::other("unknown cookie value"));
102+
}
103+
};
104+
105+
if containers > u16::MAX as u32 + 1 {
106+
return Err(Error::other("size is greater than supported"));
107+
}
108+
109+
let last_container = (containers - 1) as i64;
110+
reader.seek_relative(last_container * DESCRIPTION_BYTES as i64 + 2)?;
111+
let last_cardinality = reader.read_u16::<LittleEndian>()? as usize + 1;
112+
113+
reader.seek_relative(last_container * OFFSET_BYTES as i64)?;
114+
let last_offset = reader.read_u32::<LittleEndian>()?;
115+
116+
const ARRAY_LIMIT: usize = 4096;
117+
const BITMAP_LENGTH: usize = 1024;
118+
119+
let size = 4
120+
+ last_offset as usize
121+
+ if last_cardinality < ARRAY_LIMIT {
122+
2 * last_cardinality
123+
} else {
124+
BITMAP_LENGTH
125+
};
126+
127+
if buf.len() < size {
128+
Err(Error::new(
129+
ErrorKind::UnexpectedEof,
130+
"data is truncated or invalid",
131+
))
132+
} else {
133+
Ok(BitmapReader {
134+
prefix,
135+
containers,
136+
buf: &buf[..size],
137+
})
138+
}
139+
}
140+
141+
pub fn containers(&self) -> usize {
142+
self.containers as usize
143+
}
144+
145+
#[allow(dead_code)]
146+
pub fn prefix(&self) -> u32 {
147+
self.prefix
148+
}
149+
150+
pub fn description(&self, i: usize) -> io::Result<Description> {
151+
if i >= self.containers() {
152+
return Err(Error::new(ErrorKind::InvalidInput, "index out of range"));
153+
}
154+
155+
let mut desc_buf = &self.buf[12 + i * DESCRIPTION_BYTES..];
156+
let prefix = desc_buf.read_u16::<LittleEndian>()?;
157+
let cardinality = desc_buf.read_u16::<LittleEndian>()?;
158+
Ok(Description {
159+
prefix,
160+
cardinality,
161+
})
162+
}
163+
164+
pub fn bitmap_buf(&self) -> &[u8] {
165+
&self.buf[4..]
166+
}
167+
}
168+
169+
pub struct Description {
170+
#[allow(dead_code)]
171+
pub prefix: u16,
172+
cardinality: u16,
173+
}
174+
175+
impl Description {
176+
pub fn cardinality(&self) -> usize {
177+
self.cardinality as usize + 1
178+
}
179+
}
180+
181+
pub fn bitmap_len(buf: &[u8]) -> io::Result<usize> {
182+
let tree = TreemapReader::new(buf)?;
183+
let mut sum = 0;
184+
for bitmap in tree.iter() {
185+
let bitmap = bitmap?;
186+
for i in 0..bitmap.containers() {
187+
sum += bitmap.description(i)?.cardinality();
188+
}
189+
}
190+
Ok(sum)
191+
}
192+
193+
pub fn intersection_with_serialized(tree: &mut RoaringTreemap, buf: &[u8]) -> io::Result<()> {
194+
use std::cmp::Ordering::*;
195+
let rhs = TreemapReader::new(buf)?;
196+
let mut bitmaps = Vec::new();
197+
let mut lhs_iter = tree.bitmaps();
198+
let mut rhs_iter = rhs.iter();
199+
200+
let mut lhs_curr = lhs_iter.next();
201+
let mut rhs_curr = rhs_iter.next().transpose()?;
202+
203+
while let (Some((lhs_prefix, lhs_bitmap)), Some(rhs_bitmap)) = (lhs_curr, rhs_curr.as_ref()) {
204+
match lhs_prefix.cmp(&rhs_bitmap.prefix()) {
205+
Less => {
206+
lhs_curr = lhs_iter.next();
207+
}
208+
Greater => {
209+
rhs_curr = rhs_iter.next().transpose()?;
210+
}
211+
Equal => {
212+
let intersection = lhs_bitmap
213+
.intersection_with_serialized_unchecked(Cursor::new(rhs_bitmap.bitmap_buf()))?;
214+
if !intersection.is_empty() {
215+
bitmaps.push((lhs_prefix, intersection));
216+
}
217+
lhs_curr = lhs_iter.next();
218+
rhs_curr = rhs_iter.next().transpose()?;
219+
}
220+
}
221+
}
222+
223+
*tree = RoaringTreemap::from_bitmaps(bitmaps);
224+
225+
Ok(())
226+
}
227+
228+
#[cfg(test)]
229+
mod tests {
230+
231+
use rand::Rng;
232+
use rand::SeedableRng;
233+
use rand::rngs::SmallRng;
234+
use roaring::RoaringTreemap;
235+
236+
use super::*;
237+
238+
fn create_bitmap(seed: u64) -> RoaringTreemap {
239+
let mut rng = SmallRng::seed_from_u64(seed);
240+
241+
let mut bitmap = RoaringTreemap::new();
242+
for _ in 0..50 {
243+
let v = rng.r#gen::<u64>();
244+
bitmap.insert(v);
245+
}
246+
247+
for _ in 0..50 {
248+
let v = rng.r#gen::<u64>();
249+
bitmap.insert(v & u32::MAX as u64);
250+
}
251+
252+
for _ in 0..50 {
253+
let v = rng.r#gen::<u64>();
254+
bitmap.insert(v & u16::MAX as u64);
255+
}
256+
257+
bitmap
258+
}
259+
260+
#[test]
261+
fn test_len() -> io::Result<()> {
262+
let bitmap = create_bitmap(123);
263+
let mut buf = Vec::new();
264+
bitmap.serialize_into(&mut buf)?;
265+
assert_eq!(bitmap_len(&buf)?, 150);
266+
267+
Ok(())
268+
}
269+
270+
#[test]
271+
fn test_intersection() -> io::Result<()> {
272+
let v1 = create_bitmap(123);
273+
let v2 = create_bitmap(456);
274+
let v3 = create_bitmap(789);
275+
276+
let v12 = &v1 | &v2;
277+
let v23 = &v2 | &v3;
278+
279+
assert_eq!(&v12 & &v23, v2);
280+
281+
let mut buf = Vec::new();
282+
v23.serialize_into(&mut buf)?;
283+
284+
let mut v = v12;
285+
intersection_with_serialized(&mut v, &buf)?;
286+
287+
assert_eq!(v, v2);
288+
289+
Ok(())
290+
}
291+
}

0 commit comments

Comments
 (0)