Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ buf-list = "1.0.3"
bumpalo = "3.12.0"
byte-unit = "5.1.6"
bytemuck = { version = "1", features = ["derive", "extern_crate_alloc", "must_cast", "transparentwrapper_extra"] }
byteorder = "1.4.3"
byteorder = "1.5.0"
bytes = "1.5.0"
bytesize = "2"
cbordata = { version = "0.6.0" }
Expand Down
2 changes: 2 additions & 0 deletions src/common/io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition = { workspace = true }
[dependencies]
bincode = { workspace = true }
borsh = { workspace = true }
byteorder = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
chrono-tz = { workspace = true }
Expand All @@ -32,6 +33,7 @@ wkt = { workspace = true }

[dev-dependencies]
aho-corasick = { workspace = true }
rand = { workspace = true }
rmp-serde = { workspace = true }

[lints]
Expand Down
39 changes: 37 additions & 2 deletions src/common/io/src/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use roaring::RoaringTreemap;
use roaring::treemap::Iter;
use smallvec::SmallVec;

mod reader;

// https://github.com/ClickHouse/ClickHouse/blob/516a6ed6f8bd8c5f6eed3a10e9037580b2fb6152/src/AggregateFunctions/AggregateFunctionGroupBitmapData.h#L914
pub const LARGE_THRESHOLD: usize = 32;
pub const HYBRID_MAGIC: [u8; 2] = *b"HB";
Expand Down Expand Up @@ -481,7 +483,7 @@ pub fn deserialize_bitmap(buf: &[u8]) -> Result<HybridBitmap> {
return result;
}

RoaringTreemap::deserialize_from(buf)
RoaringTreemap::deserialize_unchecked_from(buf)
.map(HybridBitmap::from)
.map_err(|e| {
let len = buf.len();
Expand All @@ -490,6 +492,39 @@ pub fn deserialize_bitmap(buf: &[u8]) -> Result<HybridBitmap> {
})
}

pub fn bitmap_len(buf: &[u8]) -> Result<u64> {
if buf.is_empty() {
return Ok(0);
}

if buf.len() > 3
&& buf[3] == HYBRID_KIND_LARGE
&& buf[..2] == HYBRID_MAGIC
&& buf[2] == HYBRID_VERSION
{
Ok(reader::bitmap_len(&buf[HYBRID_HEADER_LEN..])? as u64)
} else {
Ok(deserialize_bitmap(buf)?.len())
}
}

pub fn intersection_with_serialized(lhs: &mut HybridBitmap, buf: &[u8]) -> Result<()> {
if let HybridBitmap::Large(lhs) = lhs
&& buf.len() > 3
&& buf[3] == HYBRID_KIND_LARGE
&& buf[..2] == HYBRID_MAGIC
&& buf[2] == HYBRID_VERSION
{
Ok(reader::intersection_with_serialized(
lhs,
&buf[HYBRID_HEADER_LEN..],
)?)
} else {
*lhs &= deserialize_bitmap(buf)?;
Ok(())
}
}

fn try_decode_hybrid_bitmap(buf: &[u8]) -> Option<Result<HybridBitmap>> {
if buf.len() < HYBRID_HEADER_LEN {
return None;
Expand All @@ -511,7 +546,7 @@ fn try_decode_hybrid_bitmap(buf: &[u8]) -> Option<Result<HybridBitmap>> {
match kind {
HYBRID_KIND_SMALL => Some(decode_small_bitmap(payload)),
HYBRID_KIND_LARGE => Some(
RoaringTreemap::deserialize_from(payload)
RoaringTreemap::deserialize_unchecked_from(payload)
.map(HybridBitmap::from)
.map_err(|e| {
let len = payload.len();
Expand Down
291 changes: 291 additions & 0 deletions src/common/io/src/bitmap/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io;
use std::io::Cursor;
use std::io::Error;
use std::io::ErrorKind;
use std::io::Seek;

use byteorder::LittleEndian;
use byteorder::ReadBytesExt;
use roaring::RoaringTreemap;

// Sizes of header structures
const DESCRIPTION_BYTES: usize = 4;
const OFFSET_BYTES: usize = 4;

#[derive(Clone)]
pub struct TreemapReader<'a> {
buf: &'a [u8],
_size: u64,
}

impl<'a> TreemapReader<'a> {
pub fn new(mut buf: &'a [u8]) -> io::Result<Self> {
let size = buf
.read_u64::<LittleEndian>()
.map_err(|_| Error::other("fail to read size"))?;

Ok(Self { buf, _size: size })
}

pub fn iter(&self) -> TreeMapIter<'_> {
TreeMapIter {
buf: self.buf,
offset: 0,
}
}
}

pub struct TreeMapIter<'a> {
buf: &'a [u8],
offset: usize,
}

impl<'a> Iterator for TreeMapIter<'a> {
type Item = io::Result<BitmapReader<'a>>;

fn next(&mut self) -> Option<Self::Item> {
if self.buf.len() == self.offset {
return None;
}

match BitmapReader::decode(&self.buf[self.offset..]) {
Ok(header) => {
self.offset += header.buf.len();
Some(Ok(header))
}
Err(err) => {
self.offset = self.buf.len();
Some(Err(err))
}
}
}
}

#[derive(Debug, Clone)]
pub struct BitmapReader<'a> {
prefix: u32,
containers: u32,
buf: &'a [u8],
}

impl BitmapReader<'_> {
pub fn decode(buf: &[u8]) -> io::Result<BitmapReader<'_>> {
let mut reader = Cursor::new(buf);
let prefix = reader.read_u32::<LittleEndian>()?;

const SERIAL_COOKIE_NO_RUNCONTAINER: u32 = 12346;
const SERIAL_COOKIE: u16 = 12347;

// First read the cookie to determine which version of the format we are reading
let containers = {
let cookie = reader.read_u32::<LittleEndian>()?;
if cookie == SERIAL_COOKIE_NO_RUNCONTAINER {
reader.read_u32::<LittleEndian>()?
} else if (cookie as u16) == SERIAL_COOKIE {
return Err(Error::other("does not support run containers"));
} else {
return Err(Error::other("unknown cookie value"));
}
};

if containers > u16::MAX as u32 + 1 {
return Err(Error::other("size is greater than supported"));
}

let last_container = (containers - 1) as i64;
reader.seek_relative(last_container * DESCRIPTION_BYTES as i64 + 2)?;
let last_cardinality = reader.read_u16::<LittleEndian>()? as usize + 1;

reader.seek_relative(last_container * OFFSET_BYTES as i64)?;
let last_offset = reader.read_u32::<LittleEndian>()?;

const ARRAY_LIMIT: usize = 4096;
const BITMAP_LENGTH: usize = 1024;

let size = 4
+ last_offset as usize
+ if last_cardinality < ARRAY_LIMIT {
2 * last_cardinality
} else {
BITMAP_LENGTH
};

if buf.len() < size {
Err(Error::new(
ErrorKind::UnexpectedEof,
"data is truncated or invalid",
))
} else {
Ok(BitmapReader {
prefix,
containers,
buf: &buf[..size],
})
}
}

pub fn containers(&self) -> usize {
self.containers as usize
}

#[allow(dead_code)]
pub fn prefix(&self) -> u32 {
self.prefix
}

pub fn description(&self, i: usize) -> io::Result<Description> {
if i >= self.containers() {
return Err(Error::new(ErrorKind::InvalidInput, "index out of range"));
}

let mut desc_buf = &self.buf[12 + i * DESCRIPTION_BYTES..];
let prefix = desc_buf.read_u16::<LittleEndian>()?;
let cardinality = desc_buf.read_u16::<LittleEndian>()?;
Ok(Description {
prefix,
cardinality,
})
}

pub fn bitmap_buf(&self) -> &[u8] {
&self.buf[4..]
}
}

pub struct Description {
#[allow(dead_code)]
pub prefix: u16,
cardinality: u16,
}

impl Description {
pub fn cardinality(&self) -> usize {
self.cardinality as usize + 1
}
}

pub fn bitmap_len(buf: &[u8]) -> io::Result<usize> {
let tree = TreemapReader::new(buf)?;
let mut sum = 0;
for bitmap in tree.iter() {
let bitmap = bitmap?;
for i in 0..bitmap.containers() {
sum += bitmap.description(i)?.cardinality();
}
}
Ok(sum)
}

pub fn intersection_with_serialized(tree: &mut RoaringTreemap, buf: &[u8]) -> io::Result<()> {
use std::cmp::Ordering::*;
let rhs = TreemapReader::new(buf)?;
let mut bitmaps = Vec::new();
let mut lhs_iter = tree.bitmaps();
let mut rhs_iter = rhs.iter();

let mut lhs_curr = lhs_iter.next();
let mut rhs_curr = rhs_iter.next().transpose()?;

while let (Some((lhs_prefix, lhs_bitmap)), Some(rhs_bitmap)) = (lhs_curr, rhs_curr.as_ref()) {
match lhs_prefix.cmp(&rhs_bitmap.prefix()) {
Less => {
lhs_curr = lhs_iter.next();
}
Greater => {
rhs_curr = rhs_iter.next().transpose()?;
}
Equal => {
let intersection = lhs_bitmap
.intersection_with_serialized_unchecked(Cursor::new(rhs_bitmap.bitmap_buf()))?;
if !intersection.is_empty() {
bitmaps.push((lhs_prefix, intersection));
}
lhs_curr = lhs_iter.next();
rhs_curr = rhs_iter.next().transpose()?;
}
}
}

*tree = RoaringTreemap::from_bitmaps(bitmaps);

Ok(())
}

#[cfg(test)]
mod tests {

use rand::Rng;
use rand::SeedableRng;
use rand::rngs::SmallRng;
use roaring::RoaringTreemap;

use super::*;

fn create_bitmap(seed: u64) -> RoaringTreemap {
let mut rng = SmallRng::seed_from_u64(seed);

let mut bitmap = RoaringTreemap::new();
for _ in 0..50 {
let v = rng.r#gen::<u64>();
bitmap.insert(v);
}

for _ in 0..50 {
let v = rng.r#gen::<u64>();
bitmap.insert(v & u32::MAX as u64);
}

for _ in 0..50 {
let v = rng.r#gen::<u64>();
bitmap.insert(v & u16::MAX as u64);
}

bitmap
}

#[test]
fn test_len() -> io::Result<()> {
let bitmap = create_bitmap(123);
let mut buf = Vec::new();
bitmap.serialize_into(&mut buf)?;
assert_eq!(bitmap_len(&buf)?, 150);

Ok(())
}

#[test]
fn test_intersection() -> io::Result<()> {
let v1 = create_bitmap(123);
let v2 = create_bitmap(456);
let v3 = create_bitmap(789);

let v12 = &v1 | &v2;
let v23 = &v2 | &v3;

assert_eq!(&v12 & &v23, v2);

let mut buf = Vec::new();
v23.serialize_into(&mut buf)?;

let mut v = v12;
intersection_with_serialized(&mut v, &buf)?;

assert_eq!(v, v2);

Ok(())
}
}
Loading
Loading