Skip to content

Commit 4cd9538

Browse files
committed
neutrino+query: use BatchWriter for filter persistance
1 parent 6b17fb3 commit 4cd9538

File tree

2 files changed

+27
-7
lines changed

2 files changed

+27
-7
lines changed

neutrino.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/lightninglabs/neutrino/banman"
2626
"github.com/lightninglabs/neutrino/blockntfns"
2727
"github.com/lightninglabs/neutrino/cache/lru"
28+
"github.com/lightninglabs/neutrino/chanutils"
2829
"github.com/lightninglabs/neutrino/filterdb"
2930
"github.com/lightninglabs/neutrino/headerfs"
3031
"github.com/lightninglabs/neutrino/pushtx"
@@ -661,6 +662,7 @@ type ChainService struct { // nolint:maligned
661662
broadcaster *pushtx.Broadcaster
662663
banStore banman.Store
663664
workManager query.WorkManager
665+
filterBatchWriter *chanutils.BatchWriter[*filterdb.FilterData]
664666

665667
// peerSubscribers is a slice of active peer subscriptions, that we
666668
// will notify each time a new peer is connected.
@@ -748,6 +750,21 @@ func NewChainService(cfg Config) (*ChainService, error) {
748750
return nil, err
749751
}
750752

753+
if s.persistToDisk {
754+
cfg := &chanutils.BatchWriterConfig[*filterdb.FilterData]{
755+
QueueBufferSize: chanutils.DefaultQueueSize,
756+
MaxBatch: 1000,
757+
DBWritesTickerDuration: time.Millisecond * 500,
758+
PutItems: s.FilterDB.PutFilters,
759+
}
760+
761+
batchWriter := chanutils.NewBatchWriter[*filterdb.FilterData](
762+
cfg,
763+
)
764+
765+
s.filterBatchWriter = batchWriter
766+
}
767+
751768
filterCacheSize := DefaultFilterCacheSize
752769
if cfg.FilterCacheSize != 0 {
753770
filterCacheSize = cfg.FilterCacheSize
@@ -1606,6 +1623,10 @@ func (s *ChainService) Start() error {
16061623
err)
16071624
}
16081625

1626+
if s.persistToDisk {
1627+
s.filterBatchWriter.Start()
1628+
}
1629+
16091630
go s.connManager.Start()
16101631

16111632
// Start the peer handler which in turn starts the address and block
@@ -1645,6 +1666,10 @@ func (s *ChainService) Stop() error {
16451666
returnErr = err
16461667
}
16471668

1669+
if s.persistToDisk {
1670+
s.filterBatchWriter.Stop()
1671+
}
1672+
16481673
// Signal the remaining goroutines to quit.
16491674
close(s.quit)
16501675
s.wg.Wait()

query.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -534,16 +534,11 @@ func (q *cfiltersQuery) handleResponse(req, resp wire.Message,
534534
}
535535

536536
if q.cs.persistToDisk {
537-
filterData := &filterdb.FilterData{
537+
q.cs.filterBatchWriter.AddItem(&filterdb.FilterData{
538538
Filter: filter,
539539
BlockHash: &response.BlockHash,
540540
Type: dbFilterType,
541-
}
542-
543-
err = q.cs.FilterDB.PutFilters(filterData)
544-
if err != nil {
545-
log.Warnf("Couldn't write filter to filterDB: %v", err)
546-
}
541+
})
547542
}
548543

549544
// We delete the entry for this filter from the headerIndex to indicate

0 commit comments

Comments
 (0)