Skip to content

Commit 053a440

Browse files
author
Harshil Goel
authored
perf(core): Add a count min sketch for eq filter (#9218)
1 parent 408e064 commit 053a440

File tree

9 files changed

+521
-19
lines changed

9 files changed

+521
-19
lines changed

algo/cm-sketch.go

Lines changed: 318 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,318 @@
1+
/*
2+
* Copyright 2016-2025 Hypermode Inc. and Contributors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package algo
18+
19+
import (
20+
"bytes"
21+
"encoding/binary"
22+
"errors"
23+
"fmt"
24+
"hash"
25+
"hash/fnv"
26+
"io"
27+
"math"
28+
)
29+
30+
// This code is copied from https://www.github.com/tylertreat/BoomFilters/refs/heads/master/countmin.go
31+
// CountMinSketch implements a Count-Min Sketch as described by Cormode and
32+
// Muthukrishnan in An Improved Data Stream Summary: The Count-Min Sketch and
33+
// its Applications:
34+
//
35+
// http://dimacs.rutgers.edu/~graham/pubs/papers/cm-full.pdf
36+
//
37+
// A Count-Min Sketch (CMS) is a probabilistic data structure which
38+
// approximates the frequency of events in a data stream. Unlike a hash map, a
39+
// CMS uses sub-linear space at the expense of a configurable error factor.
40+
// Similar to Counting Bloom filters, items are hashed to a series of buckets,
41+
// which increment a counter. The frequency of an item is estimated by taking
42+
// the minimum of each of the item's respective counter values.
43+
//
44+
// Count-Min Sketches are useful for counting the frequency of events in
45+
// massive data sets or unbounded streams online. In these situations, storing
46+
// the entire data set or allocating counters for every event in memory is
47+
// impractical. It may be possible for offline processing, but real-time
48+
// processing requires fast, space-efficient solutions like the CMS. For
49+
// approximating set cardinality, refer to the HyperLogLog.
50+
type CountMinSketch struct {
51+
matrix [][]uint64 // count matrix
52+
width uint // matrix width
53+
depth uint // matrix depth
54+
count uint64 // number of items added
55+
epsilon float64 // relative-accuracy factor
56+
delta float64 // relative-accuracy probability
57+
hash hash.Hash64 // hash function (kernel for all depth functions)
58+
}
59+
60+
// hashKernel returns the upper and lower base hash values from which the k
61+
// hashes are derived.
62+
func hashKernel(data []byte, hash hash.Hash64) (uint32, uint32) {
63+
hash.Write(data)
64+
sum := hash.Sum64()
65+
hash.Reset()
66+
upper := uint32(sum & 0xffffffff)
67+
lower := uint32((sum >> 32) & 0xffffffff)
68+
return upper, lower
69+
}
70+
71+
// NewCountMinSketch creates a new Count-Min Sketch whose relative accuracy is
72+
// within a factor of epsilon with probability delta. Both of these parameters
73+
// affect the space and time complexity.
74+
func NewCountMinSketch(epsilon, delta float64) *CountMinSketch {
75+
var (
76+
width = uint(math.Ceil(math.E / epsilon))
77+
depth = uint(math.Ceil(math.Log(1 / delta)))
78+
matrix = make([][]uint64, depth)
79+
)
80+
81+
for i := uint(0); i < depth; i++ {
82+
matrix[i] = make([]uint64, width)
83+
}
84+
85+
return &CountMinSketch{
86+
matrix: matrix,
87+
width: width,
88+
depth: depth,
89+
epsilon: epsilon,
90+
delta: delta,
91+
hash: fnv.New64(),
92+
}
93+
}
94+
95+
// Epsilon returns the relative-accuracy factor, epsilon.
96+
func (c *CountMinSketch) Epsilon() float64 {
97+
return c.epsilon
98+
}
99+
100+
// Delta returns the relative-accuracy probability, delta.
101+
func (c *CountMinSketch) Delta() float64 {
102+
return c.delta
103+
}
104+
105+
// TotalCount returns the number of items added to the sketch.
106+
func (c *CountMinSketch) TotalCount() uint64 {
107+
return c.count
108+
}
109+
110+
// AddInt will add the data to the set n times. Returns the CountMinSketch to allow for
111+
// chaining.
112+
func (c *CountMinSketch) AddInt(data []byte, n uint64) *CountMinSketch {
113+
lower, upper := hashKernel(data, c.hash)
114+
existingValue := uint64(math.MaxUint64)
115+
116+
// Increment count in each row.
117+
for i := uint(0); i < c.depth; i++ {
118+
index := (uint(lower) + uint(upper)*i) % c.width
119+
val := c.matrix[i][index]
120+
if val < existingValue {
121+
existingValue = val
122+
}
123+
if c.matrix[i][index] < n {
124+
c.matrix[i][index] = n
125+
}
126+
}
127+
128+
if n > existingValue {
129+
c.count += n - existingValue
130+
}
131+
return c
132+
}
133+
134+
// Add will add the data to the set. Returns the CountMinSketch to allow for
135+
// chaining.
136+
func (c *CountMinSketch) Add(data []byte) *CountMinSketch {
137+
lower, upper := hashKernel(data, c.hash)
138+
139+
// Increment count in each row.
140+
for i := uint(0); i < c.depth; i++ {
141+
c.matrix[i][(uint(lower)+uint(upper)*i)%c.width]++
142+
}
143+
144+
c.count++
145+
return c
146+
}
147+
148+
// Count returns the approximate count for the specified item, correct within
149+
// epsilon * total count with a probability of delta.
150+
func (c *CountMinSketch) Count(data []byte) uint64 {
151+
var (
152+
lower, upper = hashKernel(data, c.hash)
153+
count = uint64(math.MaxUint64)
154+
)
155+
156+
for i := uint(0); i < c.depth; i++ {
157+
count = uint64(math.Min(float64(count),
158+
float64(c.matrix[i][(uint(lower)+uint(upper)*i)%c.width])))
159+
}
160+
161+
return count
162+
}
163+
164+
// Merge combines this CountMinSketch with another. Returns an error if the
165+
// matrix width and depth are not equal.
166+
func (c *CountMinSketch) Merge(other *CountMinSketch) error {
167+
if c.depth != other.depth {
168+
return errors.New("matrix depth must match")
169+
}
170+
171+
if c.width != other.width {
172+
return errors.New("matrix width must match")
173+
}
174+
175+
for i := uint(0); i < c.depth; i++ {
176+
for j := uint(0); j < c.width; j++ {
177+
c.matrix[i][j] += other.matrix[i][j]
178+
}
179+
}
180+
181+
c.count += other.count
182+
return nil
183+
}
184+
185+
// Reset restores the CountMinSketch to its original state. It returns itself
186+
// to allow for chaining.
187+
func (c *CountMinSketch) Reset() *CountMinSketch {
188+
for i := 0; i < len(c.matrix); i++ {
189+
for j := 0; j < len(c.matrix[i]); j++ {
190+
c.matrix[i][j] = 0
191+
}
192+
}
193+
194+
c.count = 0
195+
return c
196+
}
197+
198+
// SetHash sets the hashing function used.
199+
func (c *CountMinSketch) SetHash(h hash.Hash64) {
200+
c.hash = h
201+
}
202+
203+
// WriteDataTo writes a binary representation of the CMS data to
204+
// an io stream. It returns the number of bytes written and error
205+
func (c *CountMinSketch) WriteDataTo(stream io.Writer) (int, error) {
206+
buf := new(bytes.Buffer)
207+
// serialize epsilon and delta as cms configuration check
208+
err := binary.Write(buf, binary.LittleEndian, c.epsilon)
209+
if err != nil {
210+
return 0, err
211+
}
212+
err = binary.Write(buf, binary.LittleEndian, c.delta)
213+
if err != nil {
214+
return 0, err
215+
}
216+
err = binary.Write(buf, binary.LittleEndian, c.count)
217+
if err != nil {
218+
return 0, err
219+
}
220+
// encode matrix
221+
for i := range c.matrix {
222+
err = binary.Write(buf, binary.LittleEndian, c.matrix[i])
223+
if err != nil {
224+
return 0, err
225+
}
226+
}
227+
228+
return stream.Write(buf.Bytes())
229+
}
230+
231+
// ReadDataFrom reads a binary representation of the CMS data written
232+
// by WriteDataTo() from io stream. It returns the number of bytes read
233+
// and error
234+
// If serialized CMS configuration is different it returns error with expected params
235+
func (c *CountMinSketch) ReadDataFrom(stream io.Reader) (int, error) {
236+
var (
237+
count uint64
238+
epsilon, delta float64
239+
)
240+
241+
err := binary.Read(stream, binary.LittleEndian, &epsilon)
242+
if err != nil {
243+
return 0, err
244+
}
245+
err = binary.Read(stream, binary.LittleEndian, &delta)
246+
if err != nil {
247+
return 0, err
248+
}
249+
250+
// check if serialized and target cms configurations are same
251+
if c.epsilon != epsilon || c.delta != delta {
252+
return 0, fmt.Errorf("expected cms values for epsilon %f and delta %f", epsilon, delta)
253+
}
254+
255+
err = binary.Read(stream, binary.LittleEndian, &count)
256+
if err != nil {
257+
return 0, err
258+
}
259+
260+
for i := uint(0); i < c.depth; i++ {
261+
err = binary.Read(stream, binary.LittleEndian, c.matrix[i])
262+
}
263+
// count size of matrix and count
264+
size := int(c.depth*c.width)*binary.Size(uint64(0)) + binary.Size(count) + 2*binary.Size(float64(0))
265+
266+
c.count = count
267+
268+
return size, err
269+
}
270+
271+
// TestAndRemove attemps to remove n counts of data from the CMS. If
272+
// n is greater than the data count, TestAndRemove is a no-op and
273+
// returns false. Else, return true and decrement count by n.
274+
func (c *CountMinSketch) TestAndRemove(data []byte, n uint64) bool {
275+
h, count := c.traverseDepth(data)
276+
277+
if n > count {
278+
return false
279+
}
280+
281+
for i := uint(0); i < c.depth; i++ {
282+
*h[i] -= n
283+
}
284+
285+
return true
286+
}
287+
288+
// TestAndRemoveAll counts data frequency, performs TestAndRemove(data, count),
289+
// and returns true if count is positive. If count is 0, TestAndRemoveAll is a
290+
// no-op and returns false.
291+
func (c *CountMinSketch) TestAndRemoveAll(data []byte) bool {
292+
h, count := c.traverseDepth(data)
293+
294+
if count == 0 {
295+
return false
296+
}
297+
298+
for i := uint(0); i < c.depth; i++ {
299+
*h[i] -= count
300+
}
301+
302+
return true
303+
}
304+
305+
func (c *CountMinSketch) traverseDepth(data []byte) ([]*uint64, uint64) {
306+
var (
307+
lower, upper = hashKernel(data, c.hash)
308+
count = uint64(math.MaxUint64)
309+
h = make([]*uint64, c.depth)
310+
)
311+
312+
for i := uint(0); i < c.depth; i++ {
313+
h[i] = &c.matrix[i][(uint(lower)+uint(upper)*i)%c.width]
314+
count = uint64(math.Min(float64(count), float64(*h[i])))
315+
}
316+
317+
return h, count
318+
}

dgraph/cmd/alpha/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,7 @@ func run() {
668668
AuthToken: security.GetString("token"),
669669
Audit: conf,
670670
ChangeDataConf: Alpha.Conf.GetString("cdc"),
671-
TypeFilterUidLimit: x.Config.Limit.GetInt64("type-filter-uid-limit"),
671+
TypeFilterUidLimit: x.Config.Limit.GetUint64("type-filter-uid-limit"),
672672
}
673673

674674
keys, err := ee.GetKeys(Alpha.Conf)

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ require (
8282
github.com/blevesearch/upsidedown_store_api v1.0.2 // indirect
8383
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
8484
github.com/cespare/xxhash/v2 v2.3.0 // indirect
85-
github.com/chewxy/math32 v1.10.1 // indirect
85+
github.com/chewxy/math32 v1.11.0 // indirect
8686
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
8787
github.com/distribution/reference v0.5.0 // indirect
8888
github.com/docker/go-units v0.5.0 // indirect

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,8 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
113113
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
114114
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
115115
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
116-
github.com/chewxy/math32 v1.10.1 h1:LFpeY0SLJXeaiej/eIp2L40VYfscTvKh/FSEZ68uMkU=
117-
github.com/chewxy/math32 v1.10.1/go.mod h1:dOB2rcuFrCn6UHrze36WSLVPKtzPMRAQvBvUwkSsLqs=
116+
github.com/chewxy/math32 v1.11.0 h1:8sek2JWqeaKkVnHa7bPVqCEOUPbARo4SGxs6toKyAOo=
117+
github.com/chewxy/math32 v1.11.0/go.mod h1:dOB2rcuFrCn6UHrze36WSLVPKtzPMRAQvBvUwkSsLqs=
118118
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
119119
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
120120
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=

posting/mvcc.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,9 +399,14 @@ func (c *Cache) clear() {
399399
}
400400

401401
type MemoryLayer struct {
402+
// config
402403
deleteOnUpdates bool
403-
cache *Cache
404404

405+
// data
406+
cache *Cache
407+
408+
// metrics
409+
statsHolder *StatsHolder
405410
numDisksRead int
406411
}
407412

@@ -412,9 +417,14 @@ func (ml *MemoryLayer) del(key []byte) {
412417
ml.cache.del(key)
413418
}
414419

420+
func GetStatsHolder() *StatsHolder {
421+
return memoryLayer.statsHolder
422+
}
423+
415424
func initMemoryLayer(cacheSize int64, deleteOnUpdates bool) *MemoryLayer {
416425
ml := &MemoryLayer{}
417426
ml.deleteOnUpdates = deleteOnUpdates
427+
ml.statsHolder = NewStatsHolder()
418428
if cacheSize > 0 {
419429
cache, err := ristretto.NewCache[[]byte, *CachePL](&ristretto.Config[[]byte, *CachePL]{
420430
// Use 5% of cache memory for storing counters.

0 commit comments

Comments
 (0)