Skip to content

Commit 4bb6fa2

Browse files
authored
core: Add data structure for estimate which decays over time (#5879)
* core: Add count min sketch and multisketch A count min sketch tracks estimates for added values. The multisketch provides a sliding window feature to expire old estimates and a decay factor to be applied to older estimates. Signed-off-by: Abhijat Malviya <[email protected]>
1 parent accebe4 commit 4bb6fa2

File tree

4 files changed

+409
-1
lines changed

4 files changed

+409
-1
lines changed

src/core/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ add_library(dfly_core allocation_tracker.cc bloom.cc compact_object.cc dense_set
2727
interpreter.cc glob_matcher.cc mi_memory_resource.cc qlist.cc sds_utils.cc
2828
segment_allocator.cc score_map.cc small_string.cc sorted_map.cc task_queue.cc
2929
tx_queue.cc string_set.cc string_map.cc top_keys.cc detail/bitpacking.cc
30-
page_usage_stats.cc)
30+
page_usage_stats.cc count_min_sketch.cc)
3131

3232
cxx_link(dfly_core base dfly_search_core fibers2 jsonpath
3333
absl::flat_hash_map absl::str_format absl::random_random redis_lib
@@ -56,6 +56,7 @@ cxx_test(qlist_test dfly_core DATA testdata/list.txt.zst LABELS DFLY)
5656
cxx_test(zstd_test dfly_core TRDP::zstd LABELS DFLY)
5757
cxx_test(top_keys_test dfly_core LABELS DFLY)
5858
cxx_test(page_usage_stats_test dfly_core LABELS DFLY)
59+
cxx_test(count_min_sketch_test dfly_core LABELS DFLY)
5960

6061
if(LIB_PCRE2)
6162
target_compile_definitions(dfly_core_test PRIVATE USE_PCRE2=1)

src/core/count_min_sketch.cc

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Copyright 2025, DragonflyDB authors. All rights reserved.
2+
// See LICENSE for licensing terms.
3+
//
4+
5+
#include "core/count_min_sketch.h"
6+
7+
#include <absl/time/clock.h>
8+
#include <xxhash.h>
9+
10+
#include <cmath>
11+
#include <functional>
12+
#include <iostream>
13+
14+
namespace {
15+
16+
constexpr auto MAX = std::numeric_limits<dfly::CountMinSketch::SizeT>::max();
17+
18+
uint64_t GetCurrentMS() {
19+
return absl::GetCurrentTimeNanos() / 1000 / 1000;
20+
}
21+
22+
uint64_t ExponentialDecay(uint64_t value, int64_t time_delta) {
23+
// Value halves every 5000 ms: ln(2) / 5000
24+
static constexpr double EXP_DECAY_CONST = 0.000138629;
25+
26+
return value * std::exp(-time_delta * EXP_DECAY_CONST);
27+
}
28+
29+
uint64_t LinearDecay(uint64_t value, int64_t time_delta) {
30+
// Value decrements by one every 1000 ms
31+
static constexpr double LIN_DECAY_CONST = 0.001;
32+
33+
const double decay = time_delta * LIN_DECAY_CONST;
34+
return value - std::min(static_cast<double>(value), decay);
35+
}
36+
37+
using DecayFn = std::function<uint64_t(uint64_t, int64_t)>;
38+
39+
std::array<DecayFn, 3> decay_fns = {ExponentialDecay, LinearDecay, [](auto v, auto) { return v; }};
40+
41+
} // namespace
42+
43+
namespace dfly {
44+
45+
CountMinSketch::CountMinSketch(double epsilon, double delta) {
46+
width_ = std::exp(1) / epsilon;
47+
depth_ = std::log(1.0 / delta);
48+
counters_.reserve(depth_);
49+
for (uint64_t i = 0; i < depth_; ++i) {
50+
counters_.emplace_back(width_, 0);
51+
}
52+
}
53+
54+
void CountMinSketch::Update(uint64_t key, CountMinSketch::SizeT incr) {
55+
uint64_t i = 0;
56+
std::for_each(counters_.begin(), counters_.end(), [&](auto& counter) {
57+
// It is possible to compute just two initial hashes and then use them to derive next i-2
58+
// hashes, but it results in a lot more collisions and thus much larger overestimates.
59+
const uint64_t index = Hash(key, i++);
60+
const SizeT curr = counter[index];
61+
const SizeT updated = curr + incr;
62+
counter[index] = updated < curr ? MAX : updated;
63+
});
64+
}
65+
66+
CountMinSketch::SizeT CountMinSketch::EstimateFrequency(uint64_t key) const {
67+
SizeT estimate = MAX;
68+
for (uint64_t i = 0; i < counters_.size(); ++i) {
69+
estimate = std::min(estimate, counters_[i][Hash(key, i)]);
70+
}
71+
return estimate;
72+
}
73+
74+
void CountMinSketch::Reset() {
75+
for (auto& ctr : counters_) {
76+
std::fill(ctr.begin(), ctr.end(), 0);
77+
}
78+
}
79+
80+
uint64_t CountMinSketch::Hash(uint64_t key, uint64_t i) const {
81+
return XXH3_64bits_withSeed(&key, sizeof(key), i) % width_;
82+
}
83+
84+
MultiSketch::MultiSketch(uint64_t rollover_ms, double epsilon, double delta, Decay decay)
85+
: rollover_ms_(rollover_ms), current_sketch_(sketches_.size() - 1), decay_t_(decay) {
86+
const uint64_t now = GetCurrentMS();
87+
for (uint64_t i = 0; i < sketches_.size(); ++i) {
88+
sketches_[i] = SketchWithTimestamp{CountMinSketch{epsilon, delta}, now, now};
89+
}
90+
}
91+
92+
void MultiSketch::Update(uint64_t key, CountMinSketch::SizeT incr) {
93+
if (++rollover_check_ >= rollover_check_every_) {
94+
MaybeRolloverCurrentSketch();
95+
rollover_check_ = 0;
96+
}
97+
sketches_[current_sketch_].sketch_.Update(key, incr);
98+
}
99+
100+
CountMinSketch::SizeT MultiSketch::EstimateFrequency(uint64_t key) const {
101+
CountMinSketch::SizeT estimate = 0;
102+
const uint64_t now = GetCurrentMS();
103+
104+
for (const auto& sketch : sketches_) {
105+
const auto e = sketch.sketch_.EstimateFrequency(key);
106+
// TODO use average time of sketch to compute delta
107+
estimate += decay_fns[static_cast<uint8_t>(decay_t_)](e, now - sketch.start_time_);
108+
}
109+
return estimate;
110+
}
111+
112+
void MultiSketch::MaybeRolloverCurrentSketch() {
113+
const uint64_t now = GetCurrentMS();
114+
const uint64_t oldest = (current_sketch_ + 1) % sketches_.size();
115+
if (const uint64_t oldest_ts = sketches_[oldest].start_time_; now - oldest_ts > rollover_ms_) {
116+
sketches_[oldest].sketch_.Reset();
117+
sketches_[oldest].start_time_ = now;
118+
sketches_[current_sketch_].end_time_ = now;
119+
current_sketch_ = oldest;
120+
}
121+
}
122+
123+
} // namespace dfly

src/core/count_min_sketch.h

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright 2025, DragonflyDB authors. All rights reserved.
2+
// See LICENSE for licensing terms.
3+
//
4+
#pragma once
5+
6+
#include <array>
7+
#include <cstdint>
8+
#include <vector>
9+
10+
namespace dfly {
11+
12+
/// The CountMinSketch and the MultiSketch together are intended to record values which naturally
13+
/// reduce over time, but for which explicitly recording decrements is not possible or easy.
14+
/// An example is short-lived, large memory allocations. The allocation site can record the size of
15+
/// the block allocated, and the count reduces over time, eventually going down to zero.
16+
17+
// Keeps count of items added with a small probability of overestimating counts due to hash
18+
// collisions. Counts are stored in a table where each row stores counts for each item, and the
19+
// minimum count across all rows is returned for an item when requested.
20+
class CountMinSketch {
21+
public:
22+
using SizeT = uint16_t;
23+
24+
// epsilon is the maximum deviation from actual frequency allowed per element times the sum of all
25+
// frequencies:
26+
// f_actual <= f_estimated <= f_actual + epsilon * N
27+
// where N is the sum of all frequencies
28+
// delta is the probability that f_estimated overshoots the epsilon threshold for a single
29+
// estimate, aka failure probability which means all bets are off as to what estimate is returned.
30+
31+
// With default values, the dimension of the counter table is 27182 x 9, and the size is
32+
// around 490 KiBs.
33+
explicit CountMinSketch(double epsilon = 0.0001, double delta = 0.0001);
34+
35+
// Increases the count associated with a key, a potentially slow operation as several hashes are
36+
// calculated
37+
void Update(uint64_t key, SizeT incr = 1);
38+
39+
// Estimated count for the key with a small probability for overshooting the estimate.
40+
SizeT EstimateFrequency(uint64_t key) const;
41+
42+
// Loses all existing counts by resetting them to zero.
43+
void Reset();
44+
45+
CountMinSketch(const CountMinSketch& other) = delete;
46+
CountMinSketch& operator=(const CountMinSketch& other) = delete;
47+
48+
CountMinSketch(CountMinSketch&& other) noexcept = default;
49+
CountMinSketch& operator=(CountMinSketch&& other) noexcept = default;
50+
51+
private:
52+
uint64_t Hash(uint64_t key, uint64_t i) const;
53+
54+
std::vector<std::vector<SizeT>> counters_;
55+
uint64_t width_;
56+
uint64_t depth_;
57+
};
58+
59+
// Maintains a list of three sketches with timestamps. Updates are made to the current sketch.
60+
// Once the oldest sketch is older than a fixed limit, it is discarded and becomes the current
61+
// sketch. Estimates are the sum across all sketches. The counts returned by the sketches "decay" to
62+
// lower values as the sketches become older.
63+
class MultiSketch {
64+
struct SketchWithTimestamp {
65+
CountMinSketch sketch_;
66+
uint64_t start_time_{0};
67+
uint64_t end_time_{0};
68+
};
69+
70+
public:
71+
// The decay model decides how fast values in sketches reduce as time passes.
72+
// Exponential: larger values reduce faster
73+
// Linear: all values decrease at a fixed rate
74+
// SlidingWindow: values do not decrease until the sketch containing them resets
75+
enum class Decay : uint8_t {
76+
Exponential,
77+
Linear,
78+
SlidingWindow,
79+
};
80+
81+
explicit MultiSketch(uint64_t rollover_ms = 1000, double epsilon = 0.0001, double delta = 0.0001,
82+
Decay decay = Decay::Linear);
83+
84+
// Updates the current sketch, which is associated with the latest timestamp. Can cause the oldest
85+
// sketch to be reset as a side effect if the oldest sketch is older than rollover_ms.
86+
void Update(uint64_t key, CountMinSketch::SizeT incr = 1);
87+
88+
// Returns estimate by summing estimates from all internal sketches.
89+
CountMinSketch::SizeT EstimateFrequency(uint64_t key) const;
90+
91+
// For unit tests, allow setting a smaller limit
92+
void SetRolloverCheckLimit(uint64_t rollover_check_limit) {
93+
rollover_check_every_ = rollover_check_limit;
94+
}
95+
96+
private:
97+
void MaybeRolloverCurrentSketch();
98+
99+
std::array<SketchWithTimestamp, 3> sketches_;
100+
uint64_t rollover_ms_;
101+
uint64_t current_sketch_;
102+
103+
// Do a rollover check every N calls to avoid expensive GetTime calls
104+
uint64_t rollover_check_every_{512};
105+
uint64_t rollover_check_{0};
106+
Decay decay_t_;
107+
};
108+
109+
} // namespace dfly

0 commit comments

Comments
 (0)