Skip to content

Commit 74b2b87

Browse files
ZENOTMEtisonkun
andauthored
feat(theta): compact sketch serde v3/v4 (#77)
Co-authored-by: tison <wander4096@gmail.com>
1 parent 761491e commit 74b2b87

File tree

17 files changed

+6404
-91
lines changed

17 files changed

+6404
-91
lines changed

datasketches/src/bloom/sketch.rs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use std::hash::Hasher;
2121
use crate::codec::SketchBytes;
2222
use crate::codec::SketchSlice;
2323
use crate::codec::family::Family;
24+
use crate::codec::utility::ensure_preamble_longs_in_range;
25+
use crate::codec::utility::ensure_serial_version_is;
2426
use crate::error::Error;
2527
use crate::hash::XxHash64;
2628

@@ -412,22 +414,11 @@ impl BloomFilter {
412414

413415
// Validate
414416
Family::BLOOMFILTER.validate_id(family_id)?;
415-
if serial_version != SERIAL_VERSION {
416-
return Err(Error::unsupported_serial_version(
417-
SERIAL_VERSION,
418-
serial_version,
419-
));
420-
}
421-
if !(Family::BLOOMFILTER.min_pre_longs..=Family::BLOOMFILTER.max_pre_longs)
422-
.contains(&preamble_longs)
423-
{
424-
return Err(Error::deserial(format!(
425-
"invalid preamble longs: expected [{}, {}], got {}",
426-
Family::BLOOMFILTER.min_pre_longs,
427-
Family::BLOOMFILTER.max_pre_longs,
428-
preamble_longs
429-
)));
430-
}
417+
ensure_serial_version_is(SERIAL_VERSION, serial_version)?;
418+
ensure_preamble_longs_in_range(
419+
Family::BLOOMFILTER.min_pre_longs..=Family::BLOOMFILTER.max_pre_longs,
420+
preamble_longs,
421+
)?;
431422

432423
let is_empty = (flags & EMPTY_FLAG_MASK) != 0;
433424

datasketches/src/codec/family.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ pub struct Family {
3333
}
3434

3535
impl Family {
36+
/// Theta Sketch for cardinality estimation.
37+
pub const THETA: Family = Family {
38+
id: 3,
39+
name: "THETA",
40+
min_pre_longs: 1,
41+
max_pre_longs: 3,
42+
};
43+
3644
/// The HLL family of sketches.
3745
pub const HLL: Family = Family {
3846
id: 7,

datasketches/src/codec/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,4 @@ pub use self::encode::SketchBytes;
2525

2626
// private to datasketches crate
2727
pub(crate) mod family;
28+
pub(crate) mod utility;

datasketches/src/codec/utility.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::Bound;
19+
use std::ops::RangeBounds;
20+
21+
use crate::error::Error;
22+
23+
pub(crate) fn ensure_serial_version_is(expected: u8, actual: u8) -> Result<(), Error> {
24+
if expected == actual {
25+
Ok(())
26+
} else {
27+
Err(Error::deserial(format!(
28+
"unsupported serial version: expected {expected}, got {actual}"
29+
)))
30+
}
31+
}
32+
33+
pub(crate) fn ensure_preamble_longs_in(expected: &[u8], actual: u8) -> Result<(), Error> {
34+
if expected.contains(&actual) {
35+
Ok(())
36+
} else {
37+
Err(Error::invalid_preamble_longs(expected, actual))
38+
}
39+
}
40+
41+
pub(crate) fn ensure_preamble_longs_in_range(
42+
expected: impl RangeBounds<u8>,
43+
actual: u8,
44+
) -> Result<(), Error> {
45+
let start = expected.start_bound();
46+
let end = expected.end_bound();
47+
if expected.contains(&actual) {
48+
Ok(())
49+
} else {
50+
Err(Error::deserial(format!(
51+
"invalid preamble longs: expected {}, got {actual}",
52+
match (start, end) {
53+
(Bound::Included(a), Bound::Included(b)) => format!("[{a}, {b}]"),
54+
(Bound::Included(a), Bound::Excluded(b)) => format!("[{a}, {b})"),
55+
(Bound::Excluded(a), Bound::Included(b)) => format!("({a}, {b}]"),
56+
(Bound::Excluded(a), Bound::Excluded(b)) => format!("({a}, {b})"),
57+
(Bound::Unbounded, Bound::Included(b)) => format!("at most {b}"),
58+
(Bound::Unbounded, Bound::Excluded(b)) => format!("less than {b}"),
59+
(Bound::Included(a), Bound::Unbounded) => format!("at least {a}"),
60+
(Bound::Excluded(a), Bound::Unbounded) => format!("greater than {a}"),
61+
(Bound::Unbounded, Bound::Unbounded) => unreachable!("unbounded range"),
62+
}
63+
)))
64+
}
65+
}

datasketches/src/common/binomial_bounds.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ pub(crate) fn lower_bound(
302302
/// # Arguments
303303
///
304304
/// * `num_samples` - The number of samples in the sample set.
305-
/// * `theta` - The sampling probability. Must be in the range (0.0, 1.0].
305+
/// * `theta` - The sampling probability. Must be in the range `(0.0, 1.0]`.
306306
/// * `num_std_dev` - The number of standard deviations for confidence bounds.
307307
/// * `no_data_seen` - This is normally false. However, in the case where you have zero samples and
308308
/// a theta < 1.0, this flag enables the distinction between a virgin case when no actual data has
@@ -315,7 +315,7 @@ pub(crate) fn lower_bound(
315315
///
316316
/// # Errors
317317
///
318-
/// Returns an error if `theta` is not in the range (0.0, 1.0].
318+
/// Returns an error if `theta` is not in the range `(0.0, 1.0]`.
319319
pub(crate) fn upper_bound(
320320
num_samples: u64,
321321
theta: f64,

datasketches/src/countmin/sketch.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use std::hash::Hasher;
2121
use crate::codec::SketchBytes;
2222
use crate::codec::SketchSlice;
2323
use crate::codec::family::Family;
24+
use crate::codec::utility::ensure_preamble_longs_in;
25+
use crate::codec::utility::ensure_serial_version_is;
2426
use crate::countmin::CountMinValue;
2527
use crate::countmin::UnsignedCountMinValue;
2628
use crate::countmin::serialization::FLAGS_IS_EMPTY;
@@ -350,18 +352,8 @@ impl<T: CountMinValue> CountMinSketch<T> {
350352
cursor.read_u32_le().map_err(make_error("<unused>"))?;
351353

352354
Family::COUNTMIN.validate_id(family_id)?;
353-
if serial_version != SERIAL_VERSION {
354-
return Err(Error::unsupported_serial_version(
355-
SERIAL_VERSION,
356-
serial_version,
357-
));
358-
}
359-
if preamble_longs != PREAMBLE_LONGS_SHORT {
360-
return Err(Error::invalid_preamble_longs(
361-
PREAMBLE_LONGS_SHORT,
362-
preamble_longs,
363-
));
364-
}
355+
ensure_serial_version_is(SERIAL_VERSION, serial_version)?;
356+
ensure_preamble_longs_in(&[PREAMBLE_LONGS_SHORT], preamble_longs)?;
365357

366358
let num_buckets = cursor.read_u32_le().map_err(make_error("num_buckets"))?;
367359
let num_hashes = cursor.read_u8().map_err(make_error("num_hashes"))?;

datasketches/src/cpc/sketch.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ use std::hash::Hash;
2020
use crate::codec::SketchBytes;
2121
use crate::codec::SketchSlice;
2222
use crate::codec::family::Family;
23+
use crate::codec::utility::ensure_preamble_longs_in;
24+
use crate::codec::utility::ensure_serial_version_is;
2325
use crate::common::NumStdDev;
2426
use crate::common::canonical_double;
2527
use crate::common::inv_pow2_table::INVERSE_POWERS_OF_2;
@@ -518,12 +520,7 @@ impl CpcSketch {
518520
let serial_version = cursor.read_u8().map_err(make_error("serial_version"))?;
519521
let family_id = cursor.read_u8().map_err(make_error("family_id"))?;
520522
Family::CPC.validate_id(family_id)?;
521-
if serial_version != SERIAL_VERSION {
522-
return Err(Error::unsupported_serial_version(
523-
SERIAL_VERSION,
524-
serial_version,
525-
));
526-
}
523+
ensure_serial_version_is(SERIAL_VERSION, serial_version)?;
527524

528525
let lg_k = cursor.read_u8().map_err(make_error("lg_k"))?;
529526
let first_interesting_column = cursor
@@ -594,12 +591,7 @@ impl CpcSketch {
594591

595592
let expected_preamble_ints =
596593
make_preamble_ints(num_coupons, has_hip, has_table, has_window);
597-
if preamble_ints != expected_preamble_ints {
598-
return Err(Error::invalid_preamble_longs(
599-
expected_preamble_ints,
600-
preamble_ints,
601-
));
602-
}
594+
ensure_preamble_longs_in(&[expected_preamble_ints], preamble_ints)?;
603595
if seed_hash != compute_seed_hash(seed) {
604596
return Err(Error::new(
605597
ErrorKind::InvalidData,

datasketches/src/error.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -113,15 +113,9 @@ impl Error {
113113
))
114114
}
115115

116-
pub(crate) fn unsupported_serial_version(expected: u8, actual: u8) -> Self {
117-
Self::deserial(format!(
118-
"unsupported serial version: expected {expected}, got {actual}"
119-
))
120-
}
121-
122-
pub(crate) fn invalid_preamble_longs(expected: u8, actual: u8) -> Self {
123-
Self::deserial(format!(
124-
"invalid preamble longs: expected {expected}, got {actual}"
116+
pub(crate) fn invalid_preamble_longs(expected: &[u8], actual: u8) -> Self {
117+
Error::deserial(format!(
118+
"invalid preamble longs: expected {expected:?}, got {actual}"
125119
))
126120
}
127121
}

datasketches/src/frequencies/sketch.rs

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use std::hash::Hash;
2222
use crate::codec::SketchBytes;
2323
use crate::codec::SketchSlice;
2424
use crate::codec::family::Family;
25+
use crate::codec::utility::ensure_preamble_longs_in;
26+
use crate::codec::utility::ensure_serial_version_is;
2527
use crate::error::Error;
2628
use crate::frequencies::reverse_purge_item_hash_map::ReversePurgeItemHashMap;
2729
use crate::frequencies::serialization::*;
@@ -141,7 +143,7 @@ impl<T: Eq + Hash> FrequentItemsSketch<T> {
141143

142144
/// Returns the estimated frequency for an item.
143145
///
144-
/// If the item is tracked, this is `item_count + offset`. Otherwise it is zero.
146+
/// If the item is tracked, this is `item_count + offset`. Otherwise, it is zero.
145147
///
146148
/// # Examples
147149
///
@@ -464,35 +466,18 @@ impl<T: Eq + Hash> FrequentItemsSketch<T> {
464466
cursor.read_u16_le().map_err(make_error("<unused>"))?;
465467

466468
Family::FREQUENCY.validate_id(family)?;
467-
if serial_version != SERIAL_VERSION {
468-
return Err(Error::unsupported_serial_version(
469-
SERIAL_VERSION,
470-
serial_version,
471-
));
472-
}
469+
ensure_serial_version_is(SERIAL_VERSION, serial_version)?;
473470
if lg_cur > lg_max {
474471
return Err(Error::deserial("lg_cur_map_size exceeds lg_max_map_size"));
475472
}
476473

477474
let is_empty = (flags & EMPTY_FLAG_MASK) != 0;
478475
if is_empty {
479-
return if pre_longs != PREAMBLE_LONGS_EMPTY {
480-
Err(Error::invalid_preamble_longs(
481-
PREAMBLE_LONGS_EMPTY,
482-
pre_longs,
483-
))
484-
} else {
485-
Ok(Self::with_lg_map_sizes(lg_max, lg_cur))
486-
};
487-
}
488-
489-
if pre_longs != PREAMBLE_LONGS_NONEMPTY {
490-
return Err(Error::invalid_preamble_longs(
491-
PREAMBLE_LONGS_NONEMPTY,
492-
pre_longs,
493-
));
476+
ensure_preamble_longs_in(&[PREAMBLE_LONGS_EMPTY], pre_longs)?;
477+
return Ok(Self::with_lg_map_sizes(lg_max, lg_cur));
494478
}
495479

480+
ensure_preamble_longs_in(&[PREAMBLE_LONGS_NONEMPTY], pre_longs)?;
496481
let active_items = cursor.read_u32_le().map_err(make_error("active_items"))?;
497482
let active_items = active_items as usize;
498483
cursor.read_u32_le().map_err(make_error("<unused>"))?;

datasketches/src/hll/sketch.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::hash::Hash;
2424

2525
use crate::codec::SketchSlice;
2626
use crate::codec::family::Family;
27+
use crate::codec::utility::ensure_serial_version_is;
2728
use crate::common::NumStdDev;
2829
use crate::error::Error;
2930
use crate::hll::HllType;
@@ -281,12 +282,7 @@ impl HllSketch {
281282
Family::HLL.validate_id(family_id)?;
282283

283284
// Verify serialization version
284-
if serial_version != SERIAL_VERSION {
285-
return Err(Error::unsupported_serial_version(
286-
SERIAL_VERSION,
287-
serial_version,
288-
));
289-
}
285+
ensure_serial_version_is(SERIAL_VERSION, serial_version)?;
290286

291287
// Verify lg_k range (4-21 are valid)
292288
if !(4..=21).contains(&lg_config_k) {

0 commit comments

Comments
 (0)