Skip to content

Commit 39fc0f4

Browse files
authored
add mmaped bloom filter (#1)
* add mmaped bloom filter * use 1 bit each * rename BF * update scalable bloom * rename to sprout * rename to sprout * remove murmur library * put murmur in pkg * put murmur in pkg * change find to contains
1 parent f2f0690 commit 39fc0f4

File tree

17 files changed

+943
-147
lines changed

17 files changed

+943
-147
lines changed

badgerdb.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package gobloomgo
1+
package sprout
22

33
import (
44
"fmt"

badgerdb_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package gobloomgo
1+
package sprout
22

33
import (
44
"fmt"

bloom.go

Lines changed: 162 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
1-
package gobloomgo
1+
package sprout
22

33
import (
44
"fmt"
55
"log"
66
"math"
7+
"os"
8+
"sync"
9+
"unsafe"
710

8-
"github.com/spaolacci/murmur3"
11+
"github.com/dsa0x/sprout/pkg/murmur"
12+
"github.com/edsrzf/mmap-go"
913
)
1014

11-
var ErrKeyNotFound = fmt.Errorf("Key not found")
12-
1315
type BloomFilter struct {
1416

1517
// The desired false positive rate
@@ -27,64 +29,122 @@ type BloomFilter struct {
2729
// the number of items added to the bloom filter
2830
count int
2931

30-
// the bit array
31-
bit_array []bool
32+
memFile *os.File
33+
mem mmap.MMap
34+
pageOffset int
35+
lock sync.Mutex
36+
byteSize int
3237

3338
// m is the number bits per slice(hashFn)
3439
m int
3540

3641
// one seed per hash function
3742
seeds []int64
43+
44+
path string
45+
}
46+
47+
type BloomOptions struct {
48+
49+
// path to the filter
50+
Path string
51+
52+
// The desired false positive rate
53+
Err_rate float64
54+
55+
// the number of items intended to be added to the bloom filter (n)
56+
Capacity int
57+
58+
// persistent storage
59+
Database Store
60+
61+
// growth rate of the bloom filter (valid values are 2 and 4)
62+
GrowthRate int
63+
64+
Scalable bool
3865
}
3966

4067
// NewBloom creates a new bloom filter.
41-
// err_rate is the desired false positive rate. e.g. 0.1 error rate implies 1 in 1000
68+
// err_rate is the desired false error rate. e.g. 0.001 implies 1 in 1000
4269
//
4370
// capacity is the number of entries intended to be added to the filter
4471
//
4572
// database is the persistent store to attach to the filter. can be nil.
46-
func NewBloom(err_rate float64, capacity int, database Store) *BloomFilter {
47-
if err_rate <= 0 || err_rate >= 1 {
73+
func NewBloom(opts *BloomOptions) *BloomFilter {
74+
if opts.Err_rate <= 0 || opts.Err_rate >= 1 {
4875
panic("Error rate must be between 0 and 1")
4976
}
50-
if capacity <= 0 {
77+
if opts.Capacity <= 0 {
5178
panic("Capacity must be greater than 0")
5279
}
5380

5481
// number of hash functions (k)
55-
numHashFn := int(math.Ceil(math.Log2(1.0 / err_rate)))
82+
numHashFn := int(math.Ceil(math.Log2(1.0 / opts.Err_rate)))
5683

5784
//ln22 = ln2^2
5885
ln22 := math.Pow(math.Ln2, 2)
5986

6087
// M
61-
bit_width := int((float64(capacity) * math.Abs(math.Log(err_rate)) / ln22))
88+
bit_width := int((float64(opts.Capacity) * math.Abs(math.Log(opts.Err_rate)) / ln22))
6289

6390
//m
6491
bits_per_slice := bit_width / numHashFn
6592

6693
seeds := make([]int64, numHashFn)
6794
for i := 0; i < len(seeds); i++ {
68-
seeds[i] = int64((i + 1) << 16)
95+
seeds[i] = 64 << int64((i + 1))
96+
}
97+
98+
if opts.Path == "" {
99+
opts.Path = "/tmp/bloom.db"
100+
}
101+
102+
f, err := os.OpenFile(opts.Path, os.O_RDWR|os.O_CREATE, 0644)
103+
if err != nil {
104+
log.Fatalf("error opening file: %v", err)
105+
}
106+
107+
var b byte
108+
byteSize := int(unsafe.Sizeof(&b))
109+
110+
// we only need bit_width/8 bits, but only after calculating m
111+
bit_width /= byteSize
112+
bit_width += byteSize // add extra 1 byte to ensure we have a full byte at the end
113+
114+
if err := f.Truncate(int64(bit_width)); err != nil {
115+
log.Fatalf("Error truncating file: %s", err)
116+
}
117+
118+
mem, err := mmap.MapRegion(f, bit_width, mmap.RDWR, 0, 0)
119+
if err != nil {
120+
log.Fatalf("Mmap error: %v", err)
69121
}
70122

71123
return &BloomFilter{
72-
err_rate: err_rate,
73-
capacity: capacity,
124+
err_rate: opts.Err_rate,
125+
capacity: opts.Capacity,
74126
bit_width: bit_width,
75-
bit_array: make([]bool, bit_width),
127+
memFile: f,
128+
mem: mem,
76129
m: bits_per_slice,
77130
seeds: seeds,
78-
db: database,
131+
db: opts.Database,
132+
lock: sync.Mutex{},
133+
byteSize: byteSize,
134+
path: opts.Path,
79135
}
80136
}
81137

82-
func NewBloomFromFile(path string) {
83-
84-
}
85-
86138
// Add adds the key to the bloom filter
87139
func (bf *BloomFilter) Add(key, val []byte) {
140+
bf.lock.Lock()
141+
defer bf.lock.Unlock()
142+
defer func() {
143+
if r := recover(); r != nil {
144+
log.Panicf("Error adding key %s: %v", key, r)
145+
// os.Exit(1)
146+
}
147+
}()
88148

89149
indices := bf.candidates(string(key))
90150

@@ -93,7 +153,11 @@ func (bf *BloomFilter) Add(key, val []byte) {
93153
}
94154

95155
for i := 0; i < len(indices); i++ {
96-
bf.bit_array[indices[i]] = true
156+
idx, mask := bf.getBitIndexN(indices[i])
157+
158+
// set the bit at mask position of the byte at idx
159+
// e.g. if idx = 2 and mask = 01000000, set the bit at 2nd position of byte 2
160+
bf.mem[idx] |= mask
97161
}
98162
bf.count++
99163

@@ -103,19 +167,36 @@ func (bf *BloomFilter) Add(key, val []byte) {
103167

104168
}
105169

106-
// Find checks if the key exists in the bloom filter
107-
func (bf *BloomFilter) Find(key []byte) bool {
170+
// Contains checks if the key exists in the bloom filter
171+
func (bf *BloomFilter) Contains(key []byte) bool {
172+
defer func() {
173+
if r := recover(); r != nil {
174+
log.Panicf("Error finding key: %v", r)
175+
// os.Exit(1)
176+
}
177+
}()
178+
108179
indices := bf.candidates(string(key))
109-
return arrEvery(indices, bf.bit_array)
180+
181+
for i := 0; i < len(indices); i++ {
182+
idx, mask := bf.getBitIndexN(indices[i])
183+
bit := bf.mem[idx]
184+
185+
// check if the mask part of the bit is set
186+
if bit&mask == 0 {
187+
return false
188+
}
189+
}
190+
return true
110191
}
111192

112-
// Get Gets the key from the underlying persistent store
193+
// Get gets the key from the underlying persistent store
113194
func (bf *BloomFilter) Get(key []byte) []byte {
114195
if !bf.hasStore() {
115-
log.Panicf("BloomFilter has no persistent store. Use Find() instead")
196+
log.Panicf("BloomFilter has no persistent store. Use Contains() instead")
116197
}
117198

118-
if !bf.Find(key) {
199+
if !bf.Contains(key) {
119200
return nil
120201
}
121202

@@ -132,21 +213,39 @@ func (bf *BloomFilter) hasStore() bool {
132213
return bf.db != nil && bf.db.isReady()
133214
}
134215

135-
// every checks if each index in the indices array has a value of 1 in the bit array
136-
func arrEvery(indices []uint64, bits []bool) bool {
137-
allExists := true
138-
for _, idx := range indices {
139-
if !bits[idx] {
140-
allExists = false
141-
return allExists
142-
}
216+
// getBitIndex returns the index and mask for the bit. (unused)
217+
//
218+
// The first half of the bits are set at the beginning of the byte,
219+
// the second half at the end
220+
func (bf *BloomFilter) getBitIndex(idx uint64) (uint64, byte) {
221+
denom := uint64(bf.bit_width) / 2
222+
var mask byte
223+
if idx >= denom {
224+
mask = 0x0F // 00001111
225+
idx = idx % denom
226+
} else {
227+
mask = 0xF0 // 11110000
143228
}
144-
return allExists
229+
return idx, mask
145230
}
146231

147-
// candidates uses the hash function to return all index candidates of the given key
232+
// getBitIndexN returns the index and mask for the bit.
233+
func (bf *BloomFilter) getBitIndexN(idx uint64) (uint64, byte) {
234+
quot, rem := divmod(int64(idx), int64(bf.byteSize))
235+
236+
// shift the mask to the right by the remainder to get the bit index in the byte
237+
// if byteSize = 8,
238+
// 128 = 0x80 = 1000 0000, 128 >> 2 = 64.....and so on
239+
// 1000 0000 >> 2 = 0100 0000
240+
byteSizeInDec := int64(math.Pow(2, float64(bf.byteSize)-1))
241+
shift := byte((byteSizeInDec >> rem)) // 128 >> 1,2..
242+
243+
return uint64(quot), shift
244+
}
245+
246+
// candidates uses the hash function to get all index candidates of the given key
148247
func (bf *BloomFilter) candidates(key string) []uint64 {
149-
var res []uint64
248+
res := make([]uint64, 0, len(bf.seeds))
150249
for i, seed := range bf.seeds {
151250
hash := getHash(key, seed)
152251
// each hash produces an index over m for its respective slice.
@@ -159,9 +258,8 @@ func (bf *BloomFilter) candidates(key string) []uint64 {
159258

160259
// getHash returns the non-cryptographic murmur hash of the key seeded with the given seed
161260
func getHash(key string, seed int64) uint64 {
162-
hasher := murmur3.New64WithSeed(uint32(seed))
163-
hasher.Write([]byte(key))
164-
return hasher.Sum64()
261+
hash := murmur.Murmur3_64([]byte(key), uint64(seed))
262+
return hash
165263
}
166264

167265
// getBucketIndex returns the index of the bucket where the hash falls in
@@ -174,6 +272,21 @@ func (bf *BloomFilter) Capacity() int {
174272
return bf.capacity
175273
}
176274

275+
// Close closes the file handle to the filter and the persistent store (if any)
276+
func (bf *BloomFilter) Close() error {
277+
if err := bf.mem.Flush(); err != nil {
278+
_ = bf.memFile.Close()
279+
return err
280+
}
281+
282+
if err := bf.mem.Unmap(); err != nil {
283+
_ = bf.memFile.Close()
284+
return err
285+
}
286+
287+
return bf.memFile.Close()
288+
}
289+
177290
// Count returns the number of items added to the bloom filter
178291
func (bf *BloomFilter) Count() int {
179292
return bf.count
@@ -183,3 +296,10 @@ func (bf *BloomFilter) Count() int {
183296
func (bf *BloomFilter) FilterSize() int {
184297
return bf.bit_width
185298
}
299+
300+
// divmod returns the quotient and remainder of a/b
301+
func divmod(num, denom int64) (quot, rem int64) {
302+
quot = num / denom
303+
rem = num % denom
304+
return
305+
}

0 commit comments

Comments
 (0)