Skip to content

Commit f8b6f23

Browse files
authored
Merge pull request #274 from ellemouton/batchFilterWriter
multi: introduce a batch filter writer
2 parents 5aac983 + 4cd9538 commit f8b6f23

File tree

10 files changed

+690
-112
lines changed

10 files changed

+690
-112
lines changed

chanutils/batch_writer.go

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package chanutils
2+
3+
import (
4+
"sync"
5+
"time"
6+
)
7+
8+
// BatchWriterConfig holds the configuration options for BatchWriter.
9+
type BatchWriterConfig[T any] struct {
10+
// QueueBufferSize sets the buffer size of the output channel of the
11+
// concurrent queue used by the BatchWriter.
12+
QueueBufferSize int
13+
14+
// MaxBatch is the maximum number of filters to be persisted to the DB
15+
// in one go.
16+
MaxBatch int
17+
18+
// DBWritesTickerDuration is the time after receiving a filter that the
19+
// writer will wait for more filters before writing the current batch
20+
// to the DB.
21+
DBWritesTickerDuration time.Duration
22+
23+
// PutItems will be used by the BatchWriter to persist filters in
24+
// batches.
25+
PutItems func(...T) error
26+
}
27+
28+
// BatchWriter manages writing Filters to the DB and tries to batch the writes
29+
// as much as possible.
30+
type BatchWriter[T any] struct {
31+
started sync.Once
32+
stopped sync.Once
33+
34+
cfg *BatchWriterConfig[T]
35+
36+
queue *ConcurrentQueue[T]
37+
38+
quit chan struct{}
39+
wg sync.WaitGroup
40+
}
41+
42+
// NewBatchWriter constructs a new BatchWriter using the given
43+
// BatchWriterConfig.
44+
func NewBatchWriter[T any](cfg *BatchWriterConfig[T]) *BatchWriter[T] {
45+
return &BatchWriter[T]{
46+
cfg: cfg,
47+
queue: NewConcurrentQueue[T](cfg.QueueBufferSize),
48+
quit: make(chan struct{}),
49+
}
50+
}
51+
52+
// Start starts the BatchWriter.
53+
func (b *BatchWriter[T]) Start() {
54+
b.started.Do(func() {
55+
b.queue.Start()
56+
57+
b.wg.Add(1)
58+
go b.manageNewItems()
59+
})
60+
}
61+
62+
// Stop stops the BatchWriter.
63+
func (b *BatchWriter[T]) Stop() {
64+
b.stopped.Do(func() {
65+
close(b.quit)
66+
b.wg.Wait()
67+
68+
b.queue.Stop()
69+
})
70+
}
71+
72+
// AddItem adds a given item to the BatchWriter queue.
73+
func (b *BatchWriter[T]) AddItem(item T) {
74+
b.queue.ChanIn() <- item
75+
}
76+
77+
// manageNewItems manages collecting filters and persisting them to the DB.
78+
// There are two conditions for writing a batch of filters to the DB: the first
79+
// is if a certain threshold (MaxBatch) of filters has been collected and the
80+
// other is if at least one filter has been collected and a timeout has been
81+
// reached.
82+
//
83+
// NOTE: this must be run in a goroutine.
84+
func (b *BatchWriter[T]) manageNewItems() {
85+
defer b.wg.Done()
86+
87+
batch := make([]T, 0, b.cfg.MaxBatch)
88+
89+
// writeBatch writes the current contents of the batch slice to the
90+
// filters DB.
91+
writeBatch := func() {
92+
if len(batch) == 0 {
93+
return
94+
}
95+
96+
err := b.cfg.PutItems(batch...)
97+
if err != nil {
98+
log.Errorf("Could not write filters to filterDB: %v",
99+
err)
100+
}
101+
102+
// Empty the batch slice.
103+
batch = make([]T, 0, b.cfg.MaxBatch)
104+
}
105+
106+
ticker := time.NewTicker(b.cfg.DBWritesTickerDuration)
107+
defer ticker.Stop()
108+
109+
// Stop the ticker since we don't want it to tick unless there is at
110+
// least one item in the queue.
111+
ticker.Stop()
112+
113+
for {
114+
select {
115+
case filter, ok := <-b.queue.ChanOut():
116+
if !ok {
117+
return
118+
}
119+
120+
batch = append(batch, filter)
121+
122+
switch len(batch) {
123+
// If the batch slice is full, we stop the ticker and
124+
// write the batch contents to disk.
125+
case b.cfg.MaxBatch:
126+
ticker.Stop()
127+
writeBatch()
128+
129+
// If an item is added to the batch, we reset the timer.
130+
// This ensures that if the batch threshold is not met
131+
// then items are still persisted in a timely manner.
132+
default:
133+
ticker.Reset(b.cfg.DBWritesTickerDuration)
134+
}
135+
136+
case <-ticker.C:
137+
// If the ticker ticks, then we stop it and write the
138+
// current batch contents to the db. If any more items
139+
// are added, the ticker will be reset.
140+
ticker.Stop()
141+
writeBatch()
142+
143+
case <-b.quit:
144+
writeBatch()
145+
146+
return
147+
}
148+
}
149+
}

chanutils/batch_writer_test.go

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
package chanutils
2+
3+
import (
4+
"fmt"
5+
"math/rand"
6+
"sync"
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
const waitTime = time.Second * 5
14+
15+
// TestBatchWriter tests that the BatchWriter behaves as expected.
16+
func TestBatchWriter(t *testing.T) {
17+
t.Parallel()
18+
rand.Seed(time.Now().UnixNano())
19+
20+
// waitForItems is a helper function that will wait for a given set of
21+
// items to appear in the db.
22+
waitForItems := func(db *mockItemsDB, items ...*item) {
23+
err := waitFor(func() bool {
24+
return db.hasItems(items...)
25+
}, waitTime)
26+
require.NoError(t, err)
27+
}
28+
29+
t.Run("filters persisted after ticker", func(t *testing.T) {
30+
t.Parallel()
31+
32+
// Create a mock filters DB.
33+
db := newMockItemsDB()
34+
35+
// Construct a new BatchWriter backed by the mock db.
36+
b := NewBatchWriter[*item](&BatchWriterConfig[*item]{
37+
QueueBufferSize: 10,
38+
MaxBatch: 20,
39+
DBWritesTickerDuration: time.Millisecond * 500,
40+
PutItems: db.PutItems,
41+
})
42+
b.Start()
43+
t.Cleanup(b.Stop)
44+
45+
fs := genFilterSet(5)
46+
for _, f := range fs {
47+
b.AddItem(f)
48+
}
49+
waitForItems(db, fs...)
50+
})
51+
52+
t.Run("write once threshold is reached", func(t *testing.T) {
53+
t.Parallel()
54+
55+
// Create a mock filters DB.
56+
db := newMockItemsDB()
57+
58+
// Construct a new BatchWriter backed by the mock db.
59+
// Make the DB writes ticker duration extra long so that we
60+
// can explicitly test that the batch gets persisted if the
61+
// MaxBatch threshold is reached.
62+
b := NewBatchWriter[*item](&BatchWriterConfig[*item]{
63+
QueueBufferSize: 10,
64+
MaxBatch: 20,
65+
DBWritesTickerDuration: time.Hour,
66+
PutItems: db.PutItems,
67+
})
68+
b.Start()
69+
t.Cleanup(b.Stop)
70+
71+
// Generate 30 filters and add each one to the batch writer.
72+
fs := genFilterSet(30)
73+
for _, f := range fs {
74+
b.AddItem(f)
75+
}
76+
77+
// Since the MaxBatch threshold has been reached, we expect the
78+
// first 20 filters to be persisted.
79+
waitForItems(db, fs[:20]...)
80+
81+
// Since the last 10 filters don't reach the threshold and since
82+
// the ticker has definitely not ticked yet, we don't expect the
83+
// last 10 filters to be in the db yet.
84+
require.False(t, db.hasItems(fs[21:]...))
85+
})
86+
87+
t.Run("stress test", func(t *testing.T) {
88+
t.Parallel()
89+
90+
// Create a mock filters DB.
91+
db := newMockItemsDB()
92+
93+
// Construct a new BatchWriter backed by the mock db.
94+
// Make the DB writes ticker duration extra long so that we
95+
// can explicitly test that the batch gets persisted if the
96+
// MaxBatch threshold is reached.
97+
b := NewBatchWriter[*item](&BatchWriterConfig[*item]{
98+
QueueBufferSize: 5,
99+
MaxBatch: 5,
100+
DBWritesTickerDuration: time.Millisecond * 2,
101+
PutItems: db.PutItems,
102+
})
103+
b.Start()
104+
t.Cleanup(b.Stop)
105+
106+
// Generate lots of filters and add each to the batch writer.
107+
// Sleep for a bit between each filter to ensure that we
108+
// sometimes hit the timeout write and sometimes the threshold
109+
// write.
110+
fs := genFilterSet(1000)
111+
for _, f := range fs {
112+
b.AddItem(f)
113+
114+
n := rand.Intn(3)
115+
time.Sleep(time.Duration(n) * time.Millisecond)
116+
}
117+
118+
// Since the MaxBatch threshold has been reached, we expect the
119+
// first 20 filters to be persisted.
120+
waitForItems(db, fs...)
121+
})
122+
}
123+
124+
type item struct {
125+
i int
126+
}
127+
128+
// mockItemsDB is a mock DB that holds a set of items.
129+
type mockItemsDB struct {
130+
items map[int]bool
131+
mu sync.Mutex
132+
}
133+
134+
// newMockItemsDB constructs a new mockItemsDB.
135+
func newMockItemsDB() *mockItemsDB {
136+
return &mockItemsDB{
137+
items: make(map[int]bool),
138+
}
139+
}
140+
141+
// hasItems returns true if the db contains all the given items.
142+
func (m *mockItemsDB) hasItems(items ...*item) bool {
143+
m.mu.Lock()
144+
defer m.mu.Unlock()
145+
146+
for _, i := range items {
147+
_, ok := m.items[i.i]
148+
if !ok {
149+
return false
150+
}
151+
}
152+
153+
return true
154+
}
155+
156+
// PutItems adds a set of items to the db.
157+
func (m *mockItemsDB) PutItems(items ...*item) error {
158+
m.mu.Lock()
159+
defer m.mu.Unlock()
160+
161+
for _, i := range items {
162+
m.items[i.i] = true
163+
}
164+
165+
return nil
166+
}
167+
168+
// genItemSet generates a set of numFilters items.
169+
func genFilterSet(numFilters int) []*item {
170+
res := make([]*item, numFilters)
171+
for i := 0; i < numFilters; i++ {
172+
res[i] = &item{i: i}
173+
}
174+
175+
return res
176+
}
177+
178+
// pollInterval is a constant specifying a 200 ms interval.
179+
const pollInterval = 200 * time.Millisecond
180+
181+
// waitFor is a helper test function that will wait for a timeout period of
182+
// time until the passed predicate returns true. This function is helpful as
183+
// timing doesn't always line up well when running integration tests with
184+
// several running lnd nodes. This function gives callers a way to assert that
185+
// some property is upheld within a particular time frame.
186+
func waitFor(pred func() bool, timeout time.Duration) error {
187+
exitTimer := time.After(timeout)
188+
result := make(chan bool, 1)
189+
190+
for {
191+
<-time.After(pollInterval)
192+
193+
go func() {
194+
result <- pred()
195+
}()
196+
197+
// Each time we call the pred(), we expect a result to be
198+
// returned otherwise it will timeout.
199+
select {
200+
case <-exitTimer:
201+
return fmt.Errorf("predicate not satisfied after " +
202+
"time out")
203+
204+
case succeed := <-result:
205+
if succeed {
206+
return nil
207+
}
208+
}
209+
}
210+
}

chanutils/log.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package chanutils
2+
3+
import "github.com/btcsuite/btclog"
4+
5+
// log is a logger that is initialized with no output filters. This
6+
// means the package will not perform any logging by default until the caller
7+
// requests it.
8+
var log btclog.Logger
9+
10+
// The default amount of logging is none.
11+
func init() {
12+
DisableLog()
13+
}
14+
15+
// DisableLog disables all library log output. Logging output is disabled
16+
// by default until either UseLogger or SetLogWriter are called.
17+
func DisableLog() {
18+
UseLogger(btclog.Disabled)
19+
}
20+
21+
// UseLogger uses a specified Logger to output package logging info.
22+
// This should be used in preference to SetLogWriter if the caller is also
23+
// using btclog.
24+
func UseLogger(logger btclog.Logger) {
25+
log = logger
26+
}

0 commit comments

Comments
 (0)