Skip to content

Commit 5aac983

Browse files
authored
Merge pull request #273 from ellemouton/useQueryDispatcher
query+neutrino: use query dispatcher for GetBlock and GetCFilter
2 parents 9fd0fc5 + 663cacc commit 5aac983

File tree

10 files changed

+441
-489
lines changed

10 files changed

+441
-489
lines changed

banman/reason.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ type Reason uint8
66
// We prevent using `iota` to ensure the order does not have the value since
77
// these are serialized within the database.
88
const (
9-
// ExcedeedBanThreshold signals that a peer exceeded its ban threshold.
9+
// ExceededBanThreshold signals that a peer exceeded its ban threshold.
1010
ExceededBanThreshold Reason = 1
1111

1212
// NoCompactFilters signals that a peer was unable to serve us compact
@@ -20,6 +20,9 @@ const (
2020
// InvalidFilterHeaderCheckpoint signals that a peer served us an
2121
// invalid filter header checkpoint.
2222
InvalidFilterHeaderCheckpoint Reason = 4
23+
24+
// InvalidBlock signals that a peer served us a bad block.
25+
InvalidBlock Reason = 5
2326
)
2427

2528
// String returns a human-readable description for the reason a peer was banned.
@@ -37,6 +40,9 @@ func (r Reason) String() string {
3740
case InvalidFilterHeaderCheckpoint:
3841
return "peer served invalid filter header checkpoint"
3942

43+
case InvalidBlock:
44+
return "peer served an invalid block"
45+
4046
default:
4147
return "unknown reason"
4248
}

blockmanager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1048,7 +1048,7 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash,
10481048
// Hand the queries to the work manager, and consume the verified
10491049
// responses as they come back.
10501050
errChan := b.cfg.QueryDispatcher.Query(
1051-
q.requests(), query.Cancel(b.quit),
1051+
q.requests(), query.Cancel(b.quit), query.NoRetryMax(),
10521052
)
10531053

10541054
// Keep waiting for more headers as long as we haven't received an

blockmanager_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ const (
3737
// mockDispatcher implements the query.Dispatcher interface and allows us to
3838
// set up a custom Query method during tests.
3939
type mockDispatcher struct {
40+
query.WorkManager
41+
4042
query func(requests []*query.Request,
4143
options ...query.QueryOption) chan error
4244
}

neutrino.go

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -643,11 +643,6 @@ type ChainService struct { // nolint:maligned
643643
FilterCache *lru.Cache[FilterCacheKey, *CacheableFilter]
644644
BlockCache *lru.Cache[wire.InvVect, *CacheableBlock]
645645

646-
// queryPeers will be called to send messages to one or more peers,
647-
// expecting a response.
648-
queryPeers func(wire.Message, func(*ServerPeer, wire.Message,
649-
chan<- struct{}), ...QueryOption)
650-
651646
chainParams chaincfg.Params
652647
addrManager *addrmgr.AddrManager
653648
connManager *connmgr.ConnManager
@@ -665,7 +660,7 @@ type ChainService struct { // nolint:maligned
665660
utxoScanner *UtxoScanner
666661
broadcaster *pushtx.Broadcaster
667662
banStore banman.Store
668-
workManager *query.WorkManager
663+
workManager query.WorkManager
669664

670665
// peerSubscribers is a slice of active peer subscriptions, that we
671666
// will notify each time a new peer is connected.
@@ -741,22 +736,13 @@ func NewChainService(cfg Config) (*ChainService, error) {
741736
persistToDisk: cfg.PersistToDisk,
742737
broadcastTimeout: cfg.BroadcastTimeout,
743738
}
744-
s.workManager = query.New(&query.Config{
739+
s.workManager = query.NewWorkManager(&query.Config{
745740
ConnectedPeers: s.ConnectedPeers,
746741
NewWorker: query.NewWorker,
747742
Ranking: query.NewPeerRanking(),
748743
})
749744

750-
// We set the queryPeers method to point to queryChainServicePeers,
751-
// passing a reference to the newly created ChainService.
752-
s.queryPeers = func(msg wire.Message, f func(*ServerPeer,
753-
wire.Message, chan<- struct{}), qo ...QueryOption) {
754-
755-
queryChainServicePeers(&s, msg, f, qo...)
756-
}
757-
758745
var err error
759-
760746
s.FilterDB, err = filterdb.New(cfg.Database, cfg.ChainParams)
761747
if err != nil {
762748
return nil, err

0 commit comments

Comments
 (0)