Skip to content

Commit be82e09

Browse files
committed
place advisory lock on db
1 parent 8b0e285 commit be82e09

File tree

8 files changed

+300
-48
lines changed

8 files changed

+300
-48
lines changed

bloom.go

Lines changed: 69 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/dsa0x/sprout/pkg/murmur"
1212
"github.com/edsrzf/mmap-go"
13+
"github.com/juju/fslock"
1314
)
1415

1516
type BloomFilter struct {
@@ -33,6 +34,7 @@ type BloomFilter struct {
3334
mem mmap.MMap
3435
pageOffset int
3536
lock sync.Mutex
37+
flock *fslock.Lock
3638
byteSize int
3739

3840
// m is the number bits per slice(hashFn)
@@ -140,7 +142,17 @@ func NewBloom(opts *BloomOptions) *BloomFilter {
140142
opts: opts,
141143
}
142144

143-
err := bf.mmap()
145+
// initialize advisory lock
146+
bf.flock = fslock.New(bf.path)
147+
148+
// open the file
149+
err := bf.openFile()
150+
if err != nil {
151+
log.Fatalf("Error opening file: %v", err)
152+
}
153+
154+
// open mmap the file
155+
err = bf.mmap()
144156
if err != nil {
145157
log.Fatalf("Mmap error: %v", err)
146158
}
@@ -251,21 +263,6 @@ func (bf *BloomFilter) hasStore() bool {
251263
return bf.db != nil && bf.db.isReady()
252264
}
253265

254-
func (bf *BloomFilter) unmap() error {
255-
var err error
256-
if bf.mem != nil {
257-
err = bf.mem.Unmap()
258-
if err != nil {
259-
_ = bf.memFile.Close()
260-
return err
261-
}
262-
}
263-
if bf.memFile != nil {
264-
return bf.memFile.Close()
265-
}
266-
return nil
267-
}
268-
269266
// getBitIndex returns the index and mask for the bit. (unused)
270267
//
271268
// The first half of the bits are set at the beginning of the byte,
@@ -325,13 +322,27 @@ func (bf *BloomFilter) Capacity() int {
325322
return bf.capacity
326323
}
327324

328-
// Close closes the file handle to the filter and the persistent store (if any)
325+
// Close flushes the file to disk and closes the file handle to the filter
329326
func (bf *BloomFilter) Close() error {
330327
if err := bf.mem.Flush(); err != nil {
331328
_ = bf.memFile.Close()
332329
return err
333330
}
334-
return bf.unmap()
331+
if err := bf.unmap(); err != nil {
332+
_ = bf.memFile.Close()
333+
return err
334+
}
335+
if bf.flock != nil {
336+
err := bf.flock.Unlock()
337+
if err != nil {
338+
return err
339+
}
340+
}
341+
342+
if bf.memFile != nil {
343+
return bf.memFile.Close()
344+
}
345+
return nil
335346
}
336347

337348
// Count returns the number of items added to the bloom filter
@@ -351,9 +362,14 @@ func (bf *BloomFilter) DB() interface{} {
351362

352363
// Clear resets all bits in the bloom filter
353364
func (bf *BloomFilter) Clear() {
354-
bf.lock.Lock()
355-
defer bf.lock.Unlock()
356-
bf.mem = make([]byte, bf.bit_width)
365+
mem := make([]byte, bf.bit_width)
366+
copy(bf.mem, mem)
367+
err := bf.mem.Flush()
368+
if err != nil {
369+
fmt.Printf("Error flushing filter to disk: %s\n", err)
370+
os.Exit(1)
371+
}
372+
bf.count = 0
357373
}
358374

359375
type BloomFilterStats struct {
@@ -379,21 +395,47 @@ func (bf *BloomFilter) Stats() BloomFilterStats {
379395
}
380396
}
381397

382-
// mmap opens a the filter file and maps it into memory
383-
func (bf *BloomFilter) mmap() error {
398+
func (bf *BloomFilter) unmap() error {
384399
var err error
385-
bf.memFile, err = os.OpenFile(bf.path, os.O_RDWR|os.O_CREATE, 0666)
386-
if err != nil {
387-
return fmt.Errorf("Unable to open bloom filter file: %s", err)
400+
if bf.mem != nil {
401+
err = bf.mem.Unmap()
402+
if err != nil {
403+
return err
404+
}
388405
}
406+
return nil
407+
}
408+
409+
// mmap opens the filter file and maps it into memory
410+
func (bf *BloomFilter) mmap() error {
411+
412+
var err error
389413

390414
if err := bf.memFile.Truncate(int64(bf.opts.dataSize)); err != nil {
391415
log.Fatalf("Error truncating file: %s", err)
392416
}
393417

394418
bf.mem, err = mmap.MapRegion(bf.memFile, bf.opts.dataSize, mmap.RDWR, 0, 0)
395419
if err != nil {
396-
return fmt.Errorf("Unable to mmap bloom filter file: %s", err)
420+
return fmt.Errorf("unable to mmap bloom filter file: %s", err)
421+
}
422+
423+
return nil
424+
}
425+
426+
// openFile opens the filter file and locks it
427+
func (bf *BloomFilter) openFile() error {
428+
var err error
429+
bf.memFile, err = os.OpenFile(bf.path, os.O_RDWR|os.O_CREATE, 0666)
430+
if err != nil {
431+
return fmt.Errorf("unable to open bloom filter file: %s", err)
432+
}
433+
434+
if err := bf.flock.TryLock(); err != nil {
435+
if err == fslock.ErrLocked {
436+
return fmt.Errorf("file is locked by another process")
437+
}
438+
return fmt.Errorf("unable to lock bloom filter file: %s", err)
397439
}
398440

399441
return nil

bloom_test.go

Lines changed: 102 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"encoding/binary"
55
"fmt"
66
"os"
7+
"os/exec"
78
"testing"
89

910
"github.com/dgraph-io/badger/v3"
@@ -52,7 +53,7 @@ func TestBloomFilter_Add(t *testing.T) {
5253
opts := &BloomOptions{
5354
Err_rate: 0.01,
5455
Capacity: 110000,
55-
Path: "./test.db",
56+
Path: "./test1.db",
5657
}
5758
bf := NewBloom(opts)
5859

@@ -84,7 +85,7 @@ func TestBloomFilter_Add(t *testing.T) {
8485
opts := &BloomOptions{
8586
Err_rate: 0.01,
8687
Capacity: 1000,
87-
Path: "./test.db",
88+
Path: "./test2.db",
8889
}
8990
bf := NewBloom(opts)
9091

@@ -112,7 +113,7 @@ func TestBloomFilter_Add(t *testing.T) {
112113
opts := &BloomOptions{
113114
Err_rate: 0.1,
114115
Capacity: 100,
115-
Path: "./test.db",
116+
Path: "./test3.db",
116117
}
117118
bf := NewBloom(opts)
118119

@@ -134,12 +135,15 @@ func TestBloomFilter_Merge(t *testing.T) {
134135
Path: "./test.db",
135136
}
136137
bf := NewBloom(opts)
137-
bf2 := NewBloom(opts)
138+
opts2 := *opts
139+
opts2.Path = "./test2.db"
140+
bf2 := NewBloom(&opts2)
138141

139142
defer func() {
140143
bf.Close()
141144
bf2.Close()
142145
os.Remove(opts.Path)
146+
os.Remove(opts2.Path)
143147
}()
144148

145149
t.Run("merge success", func(t *testing.T) {
@@ -153,11 +157,12 @@ func TestBloomFilter_Merge(t *testing.T) {
153157
opts := &BloomOptions{
154158
Err_rate: 0.01,
155159
Capacity: 10000,
156-
Path: "./test.db",
160+
Path: "./test3.db",
157161
}
158162
bf2 := NewBloom(opts)
159163
bf.Merge(bf2)
160164
defer func() {
165+
bf.Close()
161166
bf2.Close()
162167
os.Remove(opts.Path)
163168
}()
@@ -170,8 +175,15 @@ func TestBloomFilter_Merge(t *testing.T) {
170175

171176
t.Run("object added to the single filters should be found in the resulting merge", func(t *testing.T) {
172177
key := []byte("foo")
178+
opts := &BloomOptions{
179+
Err_rate: 0.01,
180+
Capacity: 1000,
181+
Path: "./test4.db",
182+
}
173183
bf := NewBloom(opts)
174-
bf2 := NewBloom(opts)
184+
opts2 := *opts
185+
opts2.Path = "./test5.db"
186+
bf2 := NewBloom(&opts2)
175187
bf2.Add(key)
176188
err := bf.Merge(bf2)
177189
if err != nil {
@@ -180,6 +192,12 @@ func TestBloomFilter_Merge(t *testing.T) {
180192
if !bf.Contains(key) {
181193
t.Errorf("Expected key %s to be found in the merged filter", string(key))
182194
}
195+
defer func() {
196+
bf.Close()
197+
bf2.Close()
198+
os.Remove(opts.Path)
199+
os.Remove(opts2.Path)
200+
}()
183201
})
184202

185203
}
@@ -309,3 +327,81 @@ func assertPanic(t *testing.T, fn func()) {
309327
fn()
310328
t.Errorf("The code did not panic")
311329
}
330+
331+
func TestBloomFilter_Clear(t *testing.T) {
332+
store, cleanupFunc := DBSetupTest(t)
333+
defer cleanupFunc()
334+
opts := &BloomOptions{
335+
Err_rate: 0.01,
336+
Capacity: 1000,
337+
Database: store,
338+
Path: "./test.db",
339+
}
340+
bf := NewBloom(opts)
341+
342+
defer func() {
343+
bf.Close()
344+
os.Remove(opts.Path)
345+
}()
346+
347+
for i := 0; i < opts.Capacity/2; i++ {
348+
bf.Add([]byte(fmt.Sprintf("foo%d", i)))
349+
}
350+
351+
keys := []string{"foo", "baz", "bar"}
352+
for _, key := range keys {
353+
bf.Add([]byte(key))
354+
}
355+
356+
t.Run("should clear the bloom filter", func(t *testing.T) {
357+
bf.Clear()
358+
for _, key := range keys {
359+
if bf.Contains([]byte(key)) {
360+
t.Errorf("Expected key to not be found in bloom filter after clear")
361+
}
362+
}
363+
})
364+
365+
for i := 0; i < opts.Capacity/2; i++ {
366+
bf.Add([]byte(fmt.Sprintf("foo%d", i)))
367+
}
368+
369+
for _, key := range keys {
370+
bf.Add([]byte(key))
371+
t.Run("Should find the newly added keys", func(t *testing.T) {
372+
if !bf.Contains([]byte(key)) {
373+
t.Errorf("Expected key to be found in bloom filter after clear")
374+
}
375+
})
376+
}
377+
}
378+
func TestBloomFilter_FileLock(t *testing.T) {
379+
t.Run("should clear the bloom filter", func(t *testing.T) {
380+
store, cleanupFunc := DBSetupTest(t)
381+
defer cleanupFunc()
382+
if os.Getenv("SPROUT_LOCK") == "1" {
383+
opts := &BloomOptions{
384+
Err_rate: 0.01,
385+
Capacity: 1000,
386+
Database: store,
387+
Path: "./test.db",
388+
}
389+
bf := NewBloom(opts)
390+
defer func() {
391+
bf.Close()
392+
os.Remove(opts.Path)
393+
}()
394+
NewBloom(opts)
395+
return
396+
}
397+
398+
cmd := exec.Command(os.Args[0], "-test.run=TestBloomFilter_FileLock")
399+
cmd.Env = append(os.Environ(), "SPROUT_LOCK=1")
400+
err := cmd.Run()
401+
if e, ok := err.(*exec.ExitError); ok && !e.Success() {
402+
return
403+
}
404+
t.Fatalf("expected file lock error, got none")
405+
})
406+
407+
}

0 commit comments

Comments
 (0)