diff --git a/Cargo.lock b/Cargo.lock index 1982d08c8e567..1c08d8e486a33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3850,6 +3850,7 @@ dependencies = [ "aho-corasick", "bincode 2.0.1", "borsh", + "byteorder", "bytes", "chrono", "chrono-tz 0.8.6", @@ -3865,6 +3866,7 @@ dependencies = [ "jiff", "lexical-core", "micromarshal", + "rand 0.8.5", "rmp-serde", "roaring", "scroll 0.12.0", diff --git a/Cargo.toml b/Cargo.toml index e16e8641180a5..dea2119128f84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -242,7 +242,7 @@ buf-list = "1.0.3" bumpalo = "3.12.0" byte-unit = "5.1.6" bytemuck = { version = "1", features = ["derive", "extern_crate_alloc", "must_cast", "transparentwrapper_extra"] } -byteorder = "1.4.3" +byteorder = "1.5.0" bytes = "1.5.0" bytesize = "2" cbordata = { version = "0.6.0" } diff --git a/src/common/io/Cargo.toml b/src/common/io/Cargo.toml index cc99e814bdbd0..d31a7781b50b0 100644 --- a/src/common/io/Cargo.toml +++ b/src/common/io/Cargo.toml @@ -9,6 +9,7 @@ edition = { workspace = true } [dependencies] bincode = { workspace = true } borsh = { workspace = true } +byteorder = { workspace = true } bytes = { workspace = true } chrono = { workspace = true } chrono-tz = { workspace = true } @@ -32,6 +33,7 @@ wkt = { workspace = true } [dev-dependencies] aho-corasick = { workspace = true } +rand = { workspace = true } rmp-serde = { workspace = true } [lints] diff --git a/src/common/io/src/bitmap.rs b/src/common/io/src/bitmap.rs index cdb924acecb7e..67edc19613232 100644 --- a/src/common/io/src/bitmap.rs +++ b/src/common/io/src/bitmap.rs @@ -28,6 +28,8 @@ use roaring::RoaringTreemap; use roaring::treemap::Iter; use smallvec::SmallVec; +mod reader; + // https://github.com/ClickHouse/ClickHouse/blob/516a6ed6f8bd8c5f6eed3a10e9037580b2fb6152/src/AggregateFunctions/AggregateFunctionGroupBitmapData.h#L914 pub const LARGE_THRESHOLD: usize = 32; pub const HYBRID_MAGIC: [u8; 2] = *b"HB"; @@ -481,7 +483,7 @@ pub fn deserialize_bitmap(buf: &[u8]) -> Result { return result; } - RoaringTreemap::deserialize_from(buf) + RoaringTreemap::deserialize_unchecked_from(buf) .map(HybridBitmap::from) .map_err(|e| { let len = buf.len(); @@ -490,6 +492,39 @@ pub fn deserialize_bitmap(buf: &[u8]) -> Result { }) } +pub fn bitmap_len(buf: &[u8]) -> Result { + if buf.is_empty() { + return Ok(0); + } + + if buf.len() > 3 + && buf[3] == HYBRID_KIND_LARGE + && buf[..2] == HYBRID_MAGIC + && buf[2] == HYBRID_VERSION + { + Ok(reader::bitmap_len(&buf[HYBRID_HEADER_LEN..])? as u64) + } else { + Ok(deserialize_bitmap(buf)?.len()) + } +} + +pub fn intersection_with_serialized(lhs: &mut HybridBitmap, buf: &[u8]) -> Result<()> { + if let HybridBitmap::Large(lhs) = lhs + && buf.len() > 3 + && buf[3] == HYBRID_KIND_LARGE + && buf[..2] == HYBRID_MAGIC + && buf[2] == HYBRID_VERSION + { + Ok(reader::intersection_with_serialized( + lhs, + &buf[HYBRID_HEADER_LEN..], + )?) + } else { + *lhs &= deserialize_bitmap(buf)?; + Ok(()) + } +} + fn try_decode_hybrid_bitmap(buf: &[u8]) -> Option> { if buf.len() < HYBRID_HEADER_LEN { return None; @@ -511,7 +546,7 @@ fn try_decode_hybrid_bitmap(buf: &[u8]) -> Option> { match kind { HYBRID_KIND_SMALL => Some(decode_small_bitmap(payload)), HYBRID_KIND_LARGE => Some( - RoaringTreemap::deserialize_from(payload) + RoaringTreemap::deserialize_unchecked_from(payload) .map(HybridBitmap::from) .map_err(|e| { let len = payload.len(); diff --git a/src/common/io/src/bitmap/reader.rs b/src/common/io/src/bitmap/reader.rs new file mode 100644 index 0000000000000..465eac24f220b --- /dev/null +++ b/src/common/io/src/bitmap/reader.rs @@ -0,0 +1,291 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; +use std::io::Cursor; +use std::io::Error; +use std::io::ErrorKind; +use std::io::Seek; + +use byteorder::LittleEndian; +use byteorder::ReadBytesExt; +use roaring::RoaringTreemap; + +// Sizes of header structures +const DESCRIPTION_BYTES: usize = 4; +const OFFSET_BYTES: usize = 4; + +#[derive(Clone)] +pub struct TreemapReader<'a> { + buf: &'a [u8], + _size: u64, +} + +impl<'a> TreemapReader<'a> { + pub fn new(mut buf: &'a [u8]) -> io::Result { + let size = buf + .read_u64::() + .map_err(|_| Error::other("fail to read size"))?; + + Ok(Self { buf, _size: size }) + } + + pub fn iter(&self) -> TreeMapIter<'_> { + TreeMapIter { + buf: self.buf, + offset: 0, + } + } +} + +pub struct TreeMapIter<'a> { + buf: &'a [u8], + offset: usize, +} + +impl<'a> Iterator for TreeMapIter<'a> { + type Item = io::Result>; + + fn next(&mut self) -> Option { + if self.buf.len() == self.offset { + return None; + } + + match BitmapReader::decode(&self.buf[self.offset..]) { + Ok(header) => { + self.offset += header.buf.len(); + Some(Ok(header)) + } + Err(err) => { + self.offset = self.buf.len(); + Some(Err(err)) + } + } + } +} + +#[derive(Debug, Clone)] +pub struct BitmapReader<'a> { + prefix: u32, + containers: u32, + buf: &'a [u8], +} + +impl BitmapReader<'_> { + pub fn decode(buf: &[u8]) -> io::Result> { + let mut reader = Cursor::new(buf); + let prefix = reader.read_u32::()?; + + const SERIAL_COOKIE_NO_RUNCONTAINER: u32 = 12346; + const SERIAL_COOKIE: u16 = 12347; + + // First read the cookie to determine which version of the format we are reading + let containers = { + let cookie = reader.read_u32::()?; + if cookie == SERIAL_COOKIE_NO_RUNCONTAINER { + reader.read_u32::()? + } else if (cookie as u16) == SERIAL_COOKIE { + return Err(Error::other("does not support run containers")); + } else { + return Err(Error::other("unknown cookie value")); + } + }; + + if containers > u16::MAX as u32 + 1 { + return Err(Error::other("size is greater than supported")); + } + + let last_container = (containers - 1) as i64; + reader.seek_relative(last_container * DESCRIPTION_BYTES as i64 + 2)?; + let last_cardinality = reader.read_u16::()? as usize + 1; + + reader.seek_relative(last_container * OFFSET_BYTES as i64)?; + let last_offset = reader.read_u32::()?; + + const ARRAY_LIMIT: usize = 4096; + const BITMAP_LENGTH: usize = 1024; + + let size = 4 + + last_offset as usize + + if last_cardinality < ARRAY_LIMIT { + 2 * last_cardinality + } else { + BITMAP_LENGTH + }; + + if buf.len() < size { + Err(Error::new( + ErrorKind::UnexpectedEof, + "data is truncated or invalid", + )) + } else { + Ok(BitmapReader { + prefix, + containers, + buf: &buf[..size], + }) + } + } + + pub fn containers(&self) -> usize { + self.containers as usize + } + + #[allow(dead_code)] + pub fn prefix(&self) -> u32 { + self.prefix + } + + pub fn description(&self, i: usize) -> io::Result { + if i >= self.containers() { + return Err(Error::new(ErrorKind::InvalidInput, "index out of range")); + } + + let mut desc_buf = &self.buf[12 + i * DESCRIPTION_BYTES..]; + let prefix = desc_buf.read_u16::()?; + let cardinality = desc_buf.read_u16::()?; + Ok(Description { + prefix, + cardinality, + }) + } + + pub fn bitmap_buf(&self) -> &[u8] { + &self.buf[4..] + } +} + +pub struct Description { + #[allow(dead_code)] + pub prefix: u16, + cardinality: u16, +} + +impl Description { + pub fn cardinality(&self) -> usize { + self.cardinality as usize + 1 + } +} + +pub fn bitmap_len(buf: &[u8]) -> io::Result { + let tree = TreemapReader::new(buf)?; + let mut sum = 0; + for bitmap in tree.iter() { + let bitmap = bitmap?; + for i in 0..bitmap.containers() { + sum += bitmap.description(i)?.cardinality(); + } + } + Ok(sum) +} + +pub fn intersection_with_serialized(tree: &mut RoaringTreemap, buf: &[u8]) -> io::Result<()> { + use std::cmp::Ordering::*; + let rhs = TreemapReader::new(buf)?; + let mut bitmaps = Vec::new(); + let mut lhs_iter = tree.bitmaps(); + let mut rhs_iter = rhs.iter(); + + let mut lhs_curr = lhs_iter.next(); + let mut rhs_curr = rhs_iter.next().transpose()?; + + while let (Some((lhs_prefix, lhs_bitmap)), Some(rhs_bitmap)) = (lhs_curr, rhs_curr.as_ref()) { + match lhs_prefix.cmp(&rhs_bitmap.prefix()) { + Less => { + lhs_curr = lhs_iter.next(); + } + Greater => { + rhs_curr = rhs_iter.next().transpose()?; + } + Equal => { + let intersection = lhs_bitmap + .intersection_with_serialized_unchecked(Cursor::new(rhs_bitmap.bitmap_buf()))?; + if !intersection.is_empty() { + bitmaps.push((lhs_prefix, intersection)); + } + lhs_curr = lhs_iter.next(); + rhs_curr = rhs_iter.next().transpose()?; + } + } + } + + *tree = RoaringTreemap::from_bitmaps(bitmaps); + + Ok(()) +} + +#[cfg(test)] +mod tests { + + use rand::Rng; + use rand::SeedableRng; + use rand::rngs::SmallRng; + use roaring::RoaringTreemap; + + use super::*; + + fn create_bitmap(seed: u64) -> RoaringTreemap { + let mut rng = SmallRng::seed_from_u64(seed); + + let mut bitmap = RoaringTreemap::new(); + for _ in 0..50 { + let v = rng.r#gen::(); + bitmap.insert(v); + } + + for _ in 0..50 { + let v = rng.r#gen::(); + bitmap.insert(v & u32::MAX as u64); + } + + for _ in 0..50 { + let v = rng.r#gen::(); + bitmap.insert(v & u16::MAX as u64); + } + + bitmap + } + + #[test] + fn test_len() -> io::Result<()> { + let bitmap = create_bitmap(123); + let mut buf = Vec::new(); + bitmap.serialize_into(&mut buf)?; + assert_eq!(bitmap_len(&buf)?, 150); + + Ok(()) + } + + #[test] + fn test_intersection() -> io::Result<()> { + let v1 = create_bitmap(123); + let v2 = create_bitmap(456); + let v3 = create_bitmap(789); + + let v12 = &v1 | &v2; + let v23 = &v2 | &v3; + + assert_eq!(&v12 & &v23, v2); + + let mut buf = Vec::new(); + v23.serialize_into(&mut buf)?; + + let mut v = v12; + intersection_with_serialized(&mut v, &buf)?; + + assert_eq!(v, v2); + + Ok(()) + } +} diff --git a/src/common/io/src/lib.rs b/src/common/io/src/lib.rs index 14379062b080a..fb889d313d2fb 100644 --- a/src/common/io/src/lib.rs +++ b/src/common/io/src/lib.rs @@ -32,7 +32,7 @@ mod binary_read; mod binary_write; mod bincode_serialization; -mod bitmap; +pub mod bitmap; mod borsh_serialization; pub mod cursor_ext; mod decimal; diff --git a/src/query/functions/benches/bench.rs b/src/query/functions/benches/bench.rs index ce1883e46dc4d..cc5ab84320691 100644 --- a/src/query/functions/benches/bench.rs +++ b/src/query/functions/benches/bench.rs @@ -69,7 +69,7 @@ mod dummy { } } -#[divan::bench_group(max_time = 0.5)] +#[divan::bench_group(max_time = 2)] mod bitmap { use databend_common_expression::BlockEntry; use databend_common_expression::Column; @@ -78,56 +78,38 @@ mod bitmap { use databend_common_expression::types::number::UInt64Type; use databend_common_functions::aggregates::eval_aggr; use databend_common_io::HybridBitmap; - use databend_common_io::deserialize_bitmap; - - fn expected_xor_values(rows: usize) -> Vec { - const PERIOD: usize = 15; - - fn parity_for_rows(count: usize) -> [u8; 5] { - let mut parity = [0u8; 5]; - for n in 0..count { - let mut inserted = [false; 5]; - inserted[1] = true; - inserted[n % 3] = true; - let v5 = n % 5; - inserted[v5] = true; - for (idx, flag) in inserted.iter().enumerate() { - if *flag { - parity[idx] ^= 1; - } - } - } - parity + use rand::Rng; + use rand::SeedableRng; + use rand::rngs::SmallRng; + + fn create_bitmap(rng: &mut SmallRng) -> HybridBitmap { + let mut bitmap = HybridBitmap::new(); + + for _ in 0..20 { + let v = rng.r#gen::(); + bitmap.insert(v & u16::MAX as u64); } - let block_parity = parity_for_rows(PERIOD); - let mut parity = [0u8; 5]; - let full_blocks = rows / PERIOD; - if full_blocks % 2 == 1 { - for (dst, src) in parity.iter_mut().zip(block_parity.iter()) { - *dst ^= *src; + if rng.r#gen::() % 4 != 0 { + for _ in 0..50 { + let v = rng.r#gen::(); + bitmap.insert(v); + } + + for _ in 0..50 { + let v = rng.r#gen::(); + bitmap.insert(v & u32::MAX as u64); } - } - let remainder = rows % PERIOD; - let rem_parity = parity_for_rows(remainder); - for (dst, src) in parity.iter_mut().zip(rem_parity.iter()) { - *dst ^= *src; } - parity - .iter() - .enumerate() - .filter_map(|(value, bit)| (*bit == 1).then_some(value as u64)) - .collect() + bitmap } - fn build_bitmap_column(rows: u64) -> Column { + fn build_bitmap_column(rows: u64, seed: u64) -> Column { + let mut rng = SmallRng::seed_from_u64(seed); let bitmaps = (0..rows) - .map(|number| { - let mut rb = HybridBitmap::new(); - rb.insert(1); - rb.insert(number % 3); - rb.insert(number % 5); + .map(|_| { + let rb = create_bitmap(&mut rng); let mut data = Vec::new(); rb.serialize_into(&mut data).unwrap(); @@ -161,148 +143,80 @@ mod bitmap { UInt64Type::from_data(data) } - fn eval_bitmap_result(entry: &BlockEntry, rows: usize, agg_name: &'static str) -> HybridBitmap { - let (result_column, _) = - eval_aggr(agg_name, vec![], std::slice::from_ref(entry), rows, vec![]) - .unwrap_or_else(|_| panic!("{agg_name} evaluation failed")); - - let Column::Bitmap(result) = result_column.remove_nullable() else { - panic!("{agg_name} should return a Bitmap column"); - }; - let Some(bytes) = result.index(0) else { - panic!("{agg_name} should return exactly one row"); - }; - deserialize_bitmap(bytes).expect("deserialize bitmap result") - } - - fn run_bitmap_result_bench( - bencher: divan::Bencher, - rows: usize, - agg_name: &'static str, - entry: &BlockEntry, - validator: F, - ) where - F: Fn(&HybridBitmap) + Sync, - { - bencher.bench(|| { - let rb = eval_bitmap_result(entry, rows, agg_name); - validator(&rb); - }); + fn eval_bitmap_result(entry: &BlockEntry, rows: usize, agg_name: &'static str) { + let _ = eval_aggr(agg_name, vec![], std::slice::from_ref(entry), rows, vec![]) + .unwrap_or_else(|_| panic!("{agg_name} evaluation failed")); } - #[divan::bench(args = [100_000, 1_000_000])] + #[divan::bench(args = [1000, 65535])] fn bitmap_intersect(bencher: divan::Bencher, rows: usize) { // Emulate `CREATE TABLE ... AS SELECT build_bitmap` // followed by `SELECT bitmap_intersect(a) FROM c`. - let column = build_bitmap_column(rows as u64); - let entry: BlockEntry = column.into(); + let column = build_bitmap_column(rows as u64, 125); + let entry = column.into(); - run_bitmap_result_bench(bencher, rows, "bitmap_intersect", &entry, |rb| { - assert_eq!(rb.len(), 1); - assert!(rb.contains(1)); + bencher.bench(|| { + eval_bitmap_result(&entry, rows, "bitmap_intersect"); }); } - #[divan::bench(args = [100_000, 1_000_000])] + #[divan::bench(args = [1000, 3000, 5000])] fn bitmap_union(bencher: divan::Bencher, rows: usize) { - let column = build_bitmap_column(rows as u64); - let entry: BlockEntry = column.into(); - - run_bitmap_result_bench(bencher, rows, "bitmap_union", &entry, |rb| { - assert_eq!(rb.len(), 5); - for value in 0..5 { - assert!(rb.contains(value), "bitmap_union missing {value}"); - } - }); - } + let column = build_bitmap_column(rows as u64, 785); + let entry = column.into(); - #[divan::bench(args = [100_000, 1_000_000])] - fn bitmap_or_agg(bencher: divan::Bencher, rows: usize) { - let column = build_bitmap_column(rows as u64); - let entry: BlockEntry = column.into(); - - run_bitmap_result_bench(bencher, rows, "bitmap_or_agg", &entry, |rb| { - assert_eq!(rb.len(), 5); - for value in 0..5 { - assert!(rb.contains(value), "bitmap_or_agg missing {value}"); - } + bencher.bench(|| { + eval_bitmap_result(&entry, rows, "bitmap_union"); }); } - #[divan::bench(args = [100_000, 1_000_000])] - fn bitmap_and_agg(bencher: divan::Bencher, rows: usize) { - let column = build_bitmap_column(rows as u64); - let entry: BlockEntry = column.into(); + #[divan::bench(args = [1000, 3000, 5000])] + fn bitmap_xor_agg_overlap(bencher: divan::Bencher, rows: usize) { + let column = build_bitmap_column(rows as u64, 778); + let entry = column.into(); - run_bitmap_result_bench(bencher, rows, "bitmap_and_agg", &entry, |rb| { - assert_eq!(rb.len(), 1); - assert!(rb.contains(1)); + bencher.bench(|| { + eval_bitmap_result(&entry, rows, "bitmap_xor_agg"); }); } #[divan::bench(args = [100_000, 1_000_000])] fn bitmap_intersect_empty(bencher: divan::Bencher, rows: usize) { let column = build_disjoint_bitmap_column(rows as u64); - let entry: BlockEntry = column.into(); + let entry = column.into(); - run_bitmap_result_bench(bencher, rows, "bitmap_intersect", &entry, |rb| { - assert_eq!(rb.len(), 0, "intersection should be empty"); + bencher.bench(|| { + eval_bitmap_result(&entry, rows, "bitmap_intersect"); }); } #[divan::bench(args = [100_000, 1_000_000])] fn bitmap_union_disjoint(bencher: divan::Bencher, rows: usize) { let column = build_disjoint_bitmap_column(rows as u64); - let entry: BlockEntry = column.into(); - - run_bitmap_result_bench(bencher, rows, "bitmap_union", &entry, |rb| { - let expected = rows as u64 * 2; - assert_eq!(rb.len(), expected); - if expected > 0 { - assert!(rb.contains(0)); - assert!(rb.contains(expected - 1)); - } + let entry = column.into(); + + bencher.bench(|| { + eval_bitmap_result(&entry, rows, "bitmap_union"); }); } #[divan::bench(args = [100_000, 1_000_000])] fn bitmap_xor_agg(bencher: divan::Bencher, rows: usize) { let column = build_disjoint_bitmap_column(rows as u64); - let entry: BlockEntry = column.into(); - - run_bitmap_result_bench(bencher, rows, "bitmap_xor_agg", &entry, |rb| { - let expected = rows as u64 * 2; - assert_eq!(rb.len(), expected); - if expected > 0 { - assert!(rb.contains(0)); - assert!(rb.contains(expected - 1)); - } - }); - } - - #[divan::bench(args = [100_000, 1_000_000])] - fn bitmap_xor_agg_overlap(bencher: divan::Bencher, rows: usize) { - let column = build_bitmap_column(rows as u64); - let entry: BlockEntry = column.into(); + let entry = column.into(); - run_bitmap_result_bench(bencher, rows, "bitmap_xor_agg", &entry, |rb| { - let expected = expected_xor_values(rows); - let actual: Vec = rb.iter().collect(); - assert_eq!(actual, expected); + bencher.bench(|| { + eval_bitmap_result(&entry, rows, "bitmap_xor_agg"); }); } #[divan::bench(args = [100_000, 1_000_000])] fn bitmap_construct_agg_dense(bencher: divan::Bencher, rows: usize) { let column = build_uint64_column(rows, |value| value); - let entry: BlockEntry = column.into(); + let entry = column.into(); - run_bitmap_result_bench(bencher, rows, "bitmap_construct_agg", &entry, |rb| { - let expected = rows as u64; - assert_eq!(rb.len(), expected); - if expected > 0 { - assert!(rb.contains(expected / 2)); - } + bencher.bench(|| { + eval_bitmap_result(&entry, rows, "bitmap_construct_agg"); }); } @@ -310,14 +224,10 @@ mod bitmap { fn bitmap_construct_agg_repeating(bencher: divan::Bencher, rows: usize) { const CARDINALITY: u64 = 1024; let column = build_uint64_column(rows, |value| value % CARDINALITY); - let entry: BlockEntry = column.into(); + let entry = column.into(); - run_bitmap_result_bench(bencher, rows, "bitmap_construct_agg", &entry, |rb| { - let expected = CARDINALITY.min(rows as u64); - assert_eq!(rb.len(), expected); - if expected > 0 { - assert!(rb.contains(expected - 1)); - } + bencher.bench(|| { + eval_bitmap_result(&entry, rows, "bitmap_construct_agg"); }); } } diff --git a/src/query/functions/src/aggregates/aggregate_bitmap.rs b/src/query/functions/src/aggregates/aggregate_bitmap.rs index f617125701834..76c3d1df1a153 100644 --- a/src/query/functions/src/aggregates/aggregate_bitmap.rs +++ b/src/query/functions/src/aggregates/aggregate_bitmap.rs @@ -41,6 +41,7 @@ use databend_common_expression::with_decimal_mapped_type; use databend_common_expression::with_number_mapped_type; use databend_common_expression::with_unsigned_integer_mapped_type; use databend_common_io::HybridBitmap; +use databend_common_io::bitmap::intersection_with_serialized; use databend_common_io::prelude::BinaryWrite; use num_traits::AsPrimitive; @@ -153,6 +154,8 @@ macro_rules! with_bitmap_op_mapped_type { trait BitmapOperate: Send + Sync + 'static { fn operate(lhs: &mut HybridBitmap, rhs: HybridBitmap); + + fn operate_buf(lhs: &mut HybridBitmap, buf: &[u8]) -> Result<()>; } struct BitmapAndOp; @@ -167,24 +170,43 @@ impl BitmapOperate for BitmapAndOp { fn operate(lhs: &mut HybridBitmap, rhs: HybridBitmap) { lhs.bitand_assign(rhs); } + + fn operate_buf(lhs: &mut HybridBitmap, buf: &[u8]) -> Result<()> { + intersection_with_serialized(lhs, buf) + } } impl BitmapOperate for BitmapOrOp { fn operate(lhs: &mut HybridBitmap, rhs: HybridBitmap) { lhs.bitor_assign(rhs); } + + fn operate_buf(lhs: &mut HybridBitmap, buf: &[u8]) -> Result<()> { + lhs.bitor_assign(deserialize_bitmap(buf)?); + Ok(()) + } } impl BitmapOperate for BitmapXorOp { fn operate(lhs: &mut HybridBitmap, rhs: HybridBitmap) { lhs.bitxor_assign(rhs); } + + fn operate_buf(lhs: &mut HybridBitmap, buf: &[u8]) -> Result<()> { + lhs.bitxor_assign(deserialize_bitmap(buf)?); + Ok(()) + } } impl BitmapOperate for BitmapNotOp { fn operate(lhs: &mut HybridBitmap, rhs: HybridBitmap) { lhs.sub_assign(rhs); } + + fn operate_buf(lhs: &mut HybridBitmap, buf: &[u8]) -> Result<()> { + lhs.sub_assign(deserialize_bitmap(buf)?); + Ok(()) + } } struct BitmapAggState { @@ -209,7 +231,17 @@ impl BitmapAggState { } } - fn add(&mut self, other: HybridBitmap) { + fn add(&mut self, other: &[u8]) -> Result<()> { + match &mut self.rb { + Some(v) => OP::operate_buf(v, other), + None => { + self.rb = Some(deserialize_bitmap(other)?); + Ok(()) + } + } + } + + fn add_bitmap(&mut self, other: HybridBitmap) { match &mut self.rb { Some(v) => { OP::operate(v, other); @@ -265,13 +297,11 @@ where if !valid { continue; } - let rb = deserialize_bitmap(data)?; - state.add::(rb); + state.add::(data)?; } } else { for data in view.iter() { - let rb = deserialize_bitmap(data)?; - state.add::(rb); + state.add::(data)?; } } Ok(()) @@ -288,8 +318,7 @@ where for (data, addr) in view.iter().zip(places.iter().cloned()) { let state = AggrState::new(addr, loc).get::(); - let rb = deserialize_bitmap(data)?; - state.add::(rb); + state.add::(data)?; } Ok(()) } @@ -298,8 +327,7 @@ where let view = entries[0].downcast::().unwrap(); let state = place.get::(); if let Some(data) = view.index(row) { - let rb = deserialize_bitmap(data)?; - state.add::(rb); + state.add::(data)?; } Ok(()) } @@ -344,8 +372,7 @@ where let flag = data[0]; data.consume(1); if flag == 1 { - let rb = deserialize_bitmap(data)?; - state.add::(rb); + state.add::(data)?; } } } else { @@ -355,8 +382,7 @@ where let flag = data[0]; data.consume(1); if flag == 1 { - let rb = deserialize_bitmap(data)?; - state.add::(rb); + state.add::(data)?; } } } @@ -368,7 +394,7 @@ where let other = rhs.get::(); if let Some(rb) = other.rb.take() { - state.add::(rb); + state.add_bitmap::(rb); } Ok(()) } @@ -532,8 +558,7 @@ where let flag = data[0]; data.consume(1); if flag == 1 { - let rb = deserialize_bitmap(data)?; - state.add::(rb); + state.add::(data)?; } } } else { @@ -542,8 +567,7 @@ where let flag = data[0]; data.consume(1); if flag == 1 { - let rb = deserialize_bitmap(data)?; - state.add::(rb); + state.add::(data)?; } } } @@ -555,7 +579,7 @@ where let other = rhs.get::(); if let Some(rb) = other.rb.take() { - state.add::(rb); + state.add_bitmap::(rb); } Ok(()) } diff --git a/src/query/functions/src/scalars/bitmap.rs b/src/query/functions/src/scalars/bitmap.rs index c49e1983139f5..1ff1dab29b68e 100644 --- a/src/query/functions/src/scalars/bitmap.rs +++ b/src/query/functions/src/scalars/bitmap.rs @@ -37,6 +37,7 @@ use databend_common_expression::vectorize_with_builder_3_arg; use databend_common_expression::with_signed_integer_mapped_type; use databend_common_expression::with_unsigned_integer_mapped_type; use databend_common_io::HybridBitmap; +use databend_common_io::bitmap::bitmap_len; use databend_common_io::deserialize_bitmap; use databend_common_io::parse_bitmap; use itertools::join; @@ -154,9 +155,10 @@ pub fn register(registry: &mut FunctionRegistry) { return; } } - match deserialize_bitmap(arg) { - Ok(rb) => { - builder.push(rb.len()); + + match bitmap_len(arg) { + Ok(n) => { + builder.push(n); } Err(e) => { ctx.set_error(builder.len(), e.to_string());