Skip to content

Commit c40c50c

Browse files
committed
add filter.Merge
1 parent e7aebbb commit c40c50c

File tree

6 files changed

+100
-15
lines changed

6 files changed

+100
-15
lines changed

badgerdb.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,8 @@ func (store *BadgerStore) isReady() bool {
9292
return store.db != nil
9393
}
9494

95+
func (store *BadgerStore) DB() interface{} {
96+
return store.db
97+
}
98+
9599
var _ Store = (*BadgerStore)(nil)

bloom.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ type BloomFilter struct {
3838
// m is the number bits per slice(hashFn)
3939
m int
4040

41+
// k is the number of hash functions
42+
k int
43+
4144
// one seed per hash function
4245
seeds []int64
4346

@@ -132,6 +135,7 @@ func NewBloom(opts *BloomOptions) *BloomFilter {
132135
lock: sync.Mutex{},
133136
byteSize: byteSize,
134137
path: opts.Path,
138+
k: numHashFn,
135139
}
136140
}
137141

@@ -209,6 +213,30 @@ func (bf *BloomFilter) Get(key []byte) []byte {
209213

210214
}
211215

216+
// Merge merges the filter with another bloom filter.
217+
// Both filters must have the same capacity and error rate.
218+
// merging increases the false positive rate of the resulting filter
219+
func (bf *BloomFilter) Merge(bf2 *BloomFilter) error {
220+
if bf.k != bf2.k {
221+
return fmt.Errorf("BloomFilter k values do not match")
222+
}
223+
if bf.bit_width != bf2.bit_width {
224+
return fmt.Errorf("BloomFilter bit_width values do not match")
225+
}
226+
227+
bf.lock.Lock()
228+
defer bf.lock.Unlock()
229+
230+
bf2.lock.Lock()
231+
defer bf2.lock.Unlock()
232+
233+
for i := 0; i < bf.bit_width; i++ {
234+
bf.mem[i] |= bf2.mem[i]
235+
}
236+
237+
return nil
238+
}
239+
212240
func (bf *BloomFilter) hasStore() bool {
213241
return bf.db != nil && bf.db.isReady()
214242
}
@@ -297,6 +325,11 @@ func (bf *BloomFilter) FilterSize() int {
297325
return bf.bit_width
298326
}
299327

328+
// DB returns the underlying persistent store
329+
func (bf *BloomFilter) DB() interface{} {
330+
return bf.db.DB()
331+
}
332+
300333
// divmod returns the quotient and remainder of a/b
301334
func divmod(num, denom int64) (quot, rem int64) {
302335
quot = num / denom

bloom_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,63 @@ func TestBloomFilter_Add(t *testing.T) {
126126
t.Errorf("Expected function to panic when there is no persistent store, got %s", val)
127127
})
128128
}
129+
130+
func TestBloomFilter_Merge(t *testing.T) {
131+
opts := &BloomOptions{
132+
Err_rate: 0.01,
133+
Capacity: 1000,
134+
Path: "./test.db",
135+
}
136+
bf := NewBloom(opts)
137+
bf2 := NewBloom(opts)
138+
139+
defer func() {
140+
bf.Close()
141+
bf2.Close()
142+
os.Remove(opts.Path)
143+
}()
144+
145+
t.Run("merge success", func(t *testing.T) {
146+
err := bf.Merge(bf2)
147+
if err != nil {
148+
t.Errorf("Expected no error, got %v", err)
149+
}
150+
})
151+
152+
t.Run("merge should return an error when the filters dont match", func(t *testing.T) {
153+
opts := &BloomOptions{
154+
Err_rate: 0.01,
155+
Capacity: 10000,
156+
Path: "./test.db",
157+
}
158+
bf2 := NewBloom(opts)
159+
bf.Merge(bf2)
160+
defer func() {
161+
bf2.Close()
162+
os.Remove(opts.Path)
163+
}()
164+
165+
err := bf.Merge(bf2)
166+
if err == nil {
167+
t.Errorf("Expected error, got nil")
168+
}
169+
})
170+
171+
t.Run("object added to the single filters should be found in the resulting merge", func(t *testing.T) {
172+
key, val := []byte("foo"), []byte("bar")
173+
bf := NewBloom(opts)
174+
bf2 := NewBloom(opts)
175+
bf2.Add(key, val)
176+
err := bf.Merge(bf2)
177+
if err != nil {
178+
t.Errorf("Expected no error, got %v", err)
179+
}
180+
if !bf.Contains(key) {
181+
t.Errorf("Expected key %s to be found in the merged filter", string(key))
182+
}
183+
})
184+
185+
}
129186
func TestBloomFilter_AddToDB(t *testing.T) {
130187
store, cleanupFunc := DBSetupTest(t)
131188
defer cleanupFunc()

boltdb.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,4 +99,8 @@ func (store *BoltStore) isReady() bool {
9999
return store.db != nil
100100
}
101101

102+
func (store *BoltStore) DB() interface{} {
103+
return store.db
104+
}
105+
102106
var _ Store = (*BoltStore)(nil)

readme.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ A bloom filter is a probabilistic data structure that is used to determine if an
44

55
To fulfil the false positive rate, bloom filters are initialized with a capacity. The capacity is the number of elements that can be inserted into the bloom filter, and this cannot be changed.
66

7-
Sprout implements a bloom filter in Go, while using boltdb and badgerdb as optional in-memory persistent storage. Sprout writes the bloom filter to a memory-mapped file.
7+
Sprout implements a bloom filter in Go, while using boltdb and badgerdb as optional in-memory persistent storage for the values. The bloom filter is written to a memory-mapped file.
88

99
Sprout also implement a scalable bloom filter described in a paper written by [P. Almeida, C.Baquero, N. Preguiça, D. Hutchison](https://haslab.uminho.pt/cbm/files/dbloom.pdf).
1010

storage.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,10 @@
11
package sprout
22

3-
import (
4-
"os"
5-
6-
badger "github.com/dgraph-io/badger/v3"
7-
bolt "go.etcd.io/bbolt"
8-
)
9-
103
type Store interface {
114
open() error
125
Close() error
136
Get(key []byte) ([]byte, error)
147
Put(key, value []byte) error
158
isReady() bool
16-
}
17-
18-
type StoreOptions struct {
19-
BucketName string
20-
Filemode os.FileMode
21-
boltOpts *bolt.Options
22-
badgerOpts *badger.Options
9+
DB() interface{}
2310
}

0 commit comments

Comments
 (0)