Skip to content

Commit 663cacc

Browse files
committed
query+neutrino: remove unused queryPeers function
Since both GetBlock and GetCFilter now make use of the work dispatcher instead of the `queryPeers` function, we can now remove the `queryPeers` and `queryChainServicePeers` functions.
1 parent 22311cb commit 663cacc

File tree

2 files changed

+0
-176
lines changed

2 files changed

+0
-176
lines changed

neutrino.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -643,11 +643,6 @@ type ChainService struct { // nolint:maligned
643643
FilterCache *lru.Cache[FilterCacheKey, *CacheableFilter]
644644
BlockCache *lru.Cache[wire.InvVect, *CacheableBlock]
645645

646-
// queryPeers will be called to send messages to one or more peers,
647-
// expecting a response.
648-
queryPeers func(wire.Message, func(*ServerPeer, wire.Message,
649-
chan<- struct{}), ...QueryOption)
650-
651646
chainParams chaincfg.Params
652647
addrManager *addrmgr.AddrManager
653648
connManager *connmgr.ConnManager
@@ -747,16 +742,7 @@ func NewChainService(cfg Config) (*ChainService, error) {
747742
Ranking: query.NewPeerRanking(),
748743
})
749744

750-
// We set the queryPeers method to point to queryChainServicePeers,
751-
// passing a reference to the newly created ChainService.
752-
s.queryPeers = func(msg wire.Message, f func(*ServerPeer,
753-
wire.Message, chan<- struct{}), qo ...QueryOption) {
754-
755-
queryChainServicePeers(&s, msg, f, qo...)
756-
}
757-
758745
var err error
759-
760746
s.FilterDB, err = filterdb.New(cfg.Database, cfg.ChainParams)
761747
if err != nil {
762748
return nil, err

query.go

Lines changed: 0 additions & 162 deletions
Original file line numberDiff line numberDiff line change
@@ -389,168 +389,6 @@ checkResponses:
389389
}
390390
}
391391

392-
// queryChainServicePeers is a helper function that sends a query to one or
393-
// more peers of the given ChainService, and waits for an answer. The timeout
394-
// for queries is set by the QueryTimeout package-level variable or the Timeout
395-
// functional option.
396-
func queryChainServicePeers(
397-
// s is the ChainService to use.
398-
s *ChainService,
399-
400-
// queryMsg is the message to send to each peer selected by selectPeer.
401-
queryMsg wire.Message,
402-
403-
// checkResponse is called for every message within the timeout period.
404-
// The quit channel lets the query know to terminate because the
405-
// required response has been found. This is done by closing the
406-
// channel.
407-
checkResponse func(sp *ServerPeer, resp wire.Message,
408-
quit chan<- struct{}),
409-
410-
// options takes functional options for executing the query.
411-
options ...QueryOption) {
412-
413-
// Starting with the set of default options, we'll apply any specified
414-
// functional options to the query.
415-
qo := defaultQueryOptions()
416-
qo.applyQueryOptions(options...)
417-
418-
// We get an initial view of our peers, to be updated each time a peer
419-
// query times out.
420-
queryPeer := s.blockManager.SyncPeer()
421-
peerTries := make(map[string]uint8)
422-
423-
// This will be state used by the peer query goroutine.
424-
queryQuit := make(chan struct{})
425-
subQuit := make(chan struct{})
426-
427-
// Increase this number to be able to handle more queries at once as
428-
// each channel gets results for all queries, otherwise messages can
429-
// get mixed and there's a vicious cycle of retries causing a bigger
430-
// message flood, more of which get missed.
431-
msgChan := make(chan spMsg)
432-
subscription := spMsgSubscription{
433-
msgChan: msgChan,
434-
quitChan: subQuit,
435-
}
436-
437-
// Loop for any messages sent to us via our subscription channel and
438-
// check them for whether they satisfy the query. Break the loop if
439-
// it's time to quit.
440-
peerTimeout := time.NewTimer(qo.timeout)
441-
connectionTimeout := time.NewTimer(qo.peerConnectTimeout)
442-
connectionTicker := connectionTimeout.C
443-
if queryPeer != nil {
444-
peerTries[queryPeer.Addr()]++
445-
queryPeer.subscribeRecvMsg(subscription)
446-
queryPeer.QueueMessageWithEncoding(queryMsg, nil, qo.encoding)
447-
}
448-
checkResponses:
449-
for {
450-
select {
451-
case <-connectionTicker:
452-
// When we time out, we're done.
453-
if queryPeer != nil {
454-
queryPeer.unsubscribeRecvMsgs(subscription)
455-
}
456-
break checkResponses
457-
458-
case <-queryQuit:
459-
// Same when we get a quit signal.
460-
if queryPeer != nil {
461-
queryPeer.unsubscribeRecvMsgs(subscription)
462-
}
463-
break checkResponses
464-
465-
case <-s.quit:
466-
// Same when chain server's quit is signaled.
467-
if queryPeer != nil {
468-
queryPeer.unsubscribeRecvMsgs(subscription)
469-
}
470-
break checkResponses
471-
472-
// A message has arrived over the subscription channel, so we
473-
// execute the checkResponses callback to see if this ends our
474-
// query session.
475-
case sm := <-msgChan:
476-
// TODO: This will get stuck if checkResponse gets
477-
// stuck. This is a caveat for callers that should be
478-
// fixed before exposing this function for public use.
479-
checkResponse(sm.sp, sm.msg, queryQuit)
480-
481-
// Each time we receive a response from the current
482-
// peer, we'll reset the main peer timeout as they're
483-
// being responsive.
484-
if !peerTimeout.Stop() {
485-
select {
486-
case <-peerTimeout.C:
487-
default:
488-
}
489-
}
490-
peerTimeout.Reset(qo.timeout)
491-
492-
// Also at this point, if the peerConnectTimeout is
493-
// still active, then we can disable it, as we're
494-
// receiving responses from the current peer.
495-
if connectionTicker != nil && !connectionTimeout.Stop() {
496-
select {
497-
case <-connectionTimeout.C:
498-
default:
499-
}
500-
}
501-
connectionTicker = nil
502-
503-
// The current peer we're querying has failed to answer the
504-
// query. Time to select a new peer and query it.
505-
case <-peerTimeout.C:
506-
if queryPeer != nil {
507-
queryPeer.unsubscribeRecvMsgs(subscription)
508-
}
509-
510-
queryPeer = nil
511-
for _, peer := range s.Peers() {
512-
// If the peer is no longer connected, we'll
513-
// skip them.
514-
if !peer.Connected() {
515-
continue
516-
}
517-
518-
// If we've yet to try this peer, we'll make
519-
// sure to do so. If we've exceeded the number
520-
// of tries we should retry this peer, then
521-
// we'll skip them.
522-
numTries, ok := peerTries[peer.Addr()]
523-
if ok && numTries >= qo.numRetries {
524-
continue
525-
}
526-
527-
queryPeer = peer
528-
529-
// Found a peer we can query.
530-
peerTries[queryPeer.Addr()]++
531-
queryPeer.subscribeRecvMsg(subscription)
532-
queryPeer.QueueMessageWithEncoding(
533-
queryMsg, nil, qo.encoding,
534-
)
535-
break
536-
}
537-
538-
// If at this point, we don't yet have a query peer,
539-
// then we'll exit now as all the peers are exhausted.
540-
if queryPeer == nil {
541-
break checkResponses
542-
}
543-
}
544-
}
545-
546-
// Close the subscription quit channel and the done channel, if any.
547-
close(subQuit)
548-
peerTimeout.Stop()
549-
if qo.doneChan != nil {
550-
close(qo.doneChan)
551-
}
552-
}
553-
554392
// getFilterFromCache returns a filter from ChainService's FilterCache if it
555393
// exists, returning nil and error if it doesn't.
556394
func (s *ChainService) getFilterFromCache(blockHash *chainhash.Hash,

0 commit comments

Comments
 (0)