Skip to content

Commit 020077c

Browse files
committed
feat: Support HLL
chore: working chore: deserialize chore: working chore: working chore: more tests chore: support target hll type chore: support target hll type for new sketch
1 parent a82fd13 commit 020077c

File tree

7 files changed

+404
-7
lines changed

7 files changed

+404
-7
lines changed

build.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ fn main() {
99
bridge
1010
.files(&[
1111
datasketches.join("cpc.cpp"),
12+
datasketches.join("hll.cpp"),
1213
datasketches.join("theta.cpp"),
1314
datasketches.join("hh.cpp"),
1415
])

datasketches-cpp/hll.cpp

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#include <cstdint>
2+
#include <ios>
3+
#include <sstream>
4+
#include <iostream>
5+
6+
#include "rust/cxx.h"
7+
#include "hll/include/hll.hpp"
8+
9+
#include "hll.hpp"
10+
11+
OpaqueHLLSketch::OpaqueHLLSketch(unsigned lg_k, datasketches::target_hll_type tgt_type):
12+
inner_{ datasketches::hll_sketch(lg_k, tgt_type) } {
13+
}
14+
15+
OpaqueHLLSketch::OpaqueHLLSketch(datasketches::hll_sketch&& hll):
16+
inner_{std::move(hll)} {
17+
}
18+
19+
OpaqueHLLSketch::OpaqueHLLSketch(std::istream& is):
20+
inner_{datasketches::hll_sketch::deserialize(is)} {
21+
}
22+
23+
double OpaqueHLLSketch::estimate() const {
24+
return this->inner_.get_estimate();
25+
}
26+
27+
void OpaqueHLLSketch::update(rust::Slice<const uint8_t> buf) {
28+
this->inner_.update(buf.data(), buf.size());
29+
}
30+
31+
void OpaqueHLLSketch::update_u64(uint64_t value) {
32+
this->inner_.update(value);
33+
}
34+
35+
std::unique_ptr<std::vector<uint8_t>> OpaqueHLLSketch::serialize() const {
36+
// TODO: could use a custom streambuf to avoid the
37+
// stream -> vec copy https://stackoverflow.com/a/13059195/1779853
38+
std::stringstream s{};
39+
auto start = s.tellg();
40+
this->inner_.serialize_compact(s);
41+
s.seekg(0, std::ios::end);
42+
auto stop = s.tellg();
43+
44+
std::vector<uint8_t> v(std::size_t(stop-start));
45+
s.seekg(0, std::ios::beg);
46+
s.read(reinterpret_cast<char*>(v.data()), std::streamsize(v.size()));
47+
48+
return std::unique_ptr<std::vector<uint8_t>>(new std::vector<uint8_t>(std::move(v)));
49+
}
50+
51+
std::unique_ptr<OpaqueHLLSketch> new_opaque_hll_sketch(unsigned lg_k, datasketches::target_hll_type tgt_type) {
52+
return std::unique_ptr<OpaqueHLLSketch>(new OpaqueHLLSketch { lg_k, tgt_type });
53+
}
54+
55+
std::unique_ptr<OpaqueHLLSketch> deserialize_opaque_hll_sketch(rust::Slice<const uint8_t> buf) {
56+
// TODO: could use a custom streambuf to avoid the slice -> stream copy
57+
std::stringstream s{};
58+
s.write(const_cast<char*>(reinterpret_cast<const char*>(buf.data())), std::streamsize(buf.size()));
59+
s.seekg(0, std::ios::beg);
60+
return std::unique_ptr<OpaqueHLLSketch>(new OpaqueHLLSketch{s});
61+
}
62+
63+
OpaqueHLLUnion::OpaqueHLLUnion(uint8_t lg_max_k):
64+
inner_{ datasketches::hll_union(lg_max_k) } {
65+
}
66+
67+
std::unique_ptr<OpaqueHLLSketch> OpaqueHLLUnion::sketch(datasketches::target_hll_type tgt_type) const {
68+
return std::unique_ptr<OpaqueHLLSketch>(new OpaqueHLLSketch{this->inner_.get_result(tgt_type)});
69+
}
70+
71+
void OpaqueHLLUnion::merge(std::unique_ptr<OpaqueHLLSketch> to_add) {
72+
this->inner_.update(std::move(to_add->inner_));
73+
}
74+
75+
76+
std::unique_ptr<OpaqueHLLUnion> new_opaque_hll_union(uint8_t lg_max_k) {
77+
return std::unique_ptr<OpaqueHLLUnion>(new OpaqueHLLUnion{ lg_max_k });
78+
}

datasketches-cpp/hll.hpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#pragma once
2+
3+
#include <cstdint>
4+
#include <iostream>
5+
#include <vector>
6+
#include <memory>
7+
8+
#include "rust/cxx.h"
9+
#include "hll/include/hll.hpp"
10+
11+
// alias
12+
typedef datasketches::target_hll_type target_hll_type;
13+
14+
class OpaqueHLLSketch {
15+
public:
16+
double estimate() const;
17+
void update(rust::Slice<const uint8_t> buf);
18+
void update_u64(uint64_t value);
19+
std::unique_ptr<std::vector<uint8_t>> serialize() const;
20+
friend std::unique_ptr<OpaqueHLLSketch> deserialize_opaque_hll_sketch(rust::Slice<const uint8_t> buf);
21+
OpaqueHLLSketch(unsigned lg_k, datasketches::target_hll_type tgt_type);
22+
private:
23+
OpaqueHLLSketch(datasketches::hll_sketch&& hll);
24+
OpaqueHLLSketch(std::istream& is);
25+
friend class OpaqueHLLUnion;
26+
datasketches::hll_sketch inner_;
27+
};
28+
29+
std::unique_ptr<OpaqueHLLSketch> new_opaque_hll_sketch(unsigned lg_k, datasketches::target_hll_type tgt_type);
30+
std::unique_ptr<OpaqueHLLSketch> deserialize_opaque_hll_sketch(rust::Slice<const uint8_t> buf);
31+
32+
class OpaqueHLLUnion {
33+
public:
34+
std::unique_ptr<OpaqueHLLSketch> sketch(datasketches::target_hll_type tgt_type) const;
35+
void merge(std::unique_ptr<OpaqueHLLSketch> to_add);
36+
OpaqueHLLUnion(uint8_t lg_max_k);
37+
private:
38+
datasketches::hll_union inner_;
39+
};
40+
41+
std::unique_ptr<OpaqueHLLUnion> new_opaque_hll_union(uint8_t lg_max_k);

src/bridge.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@ pub(crate) mod ffi {
1818
unsafe fn remove_from_hashset(hashset_addr: usize, addr: usize);
1919
}
2020

21+
#[repr(i32)]
22+
enum target_hll_type {
23+
HLL_4,
24+
HLL_6,
25+
HLL_8,
26+
}
27+
2128
unsafe extern "C++" {
2229
include!("dsrs/datasketches-cpp/cpc.hpp");
2330

@@ -38,6 +45,31 @@ pub(crate) mod ffi {
3845
pub(crate) fn sketch(self: &OpaqueCpcUnion) -> UniquePtr<OpaqueCpcSketch>;
3946
pub(crate) fn merge(self: Pin<&mut OpaqueCpcUnion>, to_add: UniquePtr<OpaqueCpcSketch>);
4047

48+
include!("dsrs/datasketches-cpp/hll.hpp");
49+
50+
type target_hll_type;
51+
52+
pub(crate) type OpaqueHLLSketch;
53+
pub(crate) fn estimate(self: &OpaqueHLLSketch) -> f64;
54+
pub(crate) fn update(self: Pin<&mut OpaqueHLLSketch>, buf: &[u8]);
55+
pub(crate) fn update_u64(self: Pin<&mut OpaqueHLLSketch>, value: u64);
56+
pub(crate) fn serialize(self: &OpaqueHLLSketch) -> UniquePtr<CxxVector<u8>>;
57+
58+
pub(crate) fn new_opaque_hll_sketch(
59+
lg_k: u32,
60+
tgt_type: target_hll_type,
61+
) -> UniquePtr<OpaqueHLLSketch>;
62+
pub(crate) fn deserialize_opaque_hll_sketch(buf: &[u8]) -> UniquePtr<OpaqueHLLSketch>;
63+
64+
pub(crate) type OpaqueHLLUnion;
65+
66+
pub(crate) fn new_opaque_hll_union(lg_max_k: u8) -> UniquePtr<OpaqueHLLUnion>;
67+
pub(crate) fn sketch(
68+
self: &OpaqueHLLUnion,
69+
tgt_type: target_hll_type,
70+
) -> UniquePtr<OpaqueHLLSketch>;
71+
pub(crate) fn merge(self: Pin<&mut OpaqueHLLUnion>, to_add: UniquePtr<OpaqueHLLSketch>);
72+
4173
include!("dsrs/datasketches-cpp/theta.hpp");
4274

4375
pub(crate) type OpaqueThetaSketch;

src/lib.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,4 @@ pub mod stream_reducer;
77
mod wrapper;
88

99
pub use error::DataSketchesError;
10-
pub use wrapper::CpcSketch;
11-
pub use wrapper::CpcUnion;
12-
pub use wrapper::HhSketch;
13-
pub use wrapper::StaticThetaSketch;
14-
pub use wrapper::ThetaIntersection;
15-
pub use wrapper::ThetaSketch;
16-
pub use wrapper::ThetaUnion;
10+
pub use wrapper::*;

src/wrapper.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77
88
mod cpc;
99
pub(crate) mod hh;
10+
mod hll;
1011
mod theta;
1112

1213
pub use cpc::{CpcSketch, CpcUnion};
1314
pub use hh::HhSketch;
15+
pub use hll::{HLLSketch, HLLType, HLLUnion};
1416
pub use theta::{StaticThetaSketch, ThetaIntersection, ThetaSketch, ThetaUnion};

0 commit comments

Comments
 (0)