Skip to content

Commit 4dbe2e8

Browse files
committed
neutrino + query: Request made more flexible to querying.
This commit makes the query package more flexible in querying peers. This is done by adding sendQuery function field to query.Request struct. Instead of using only QueueMessageWithEncoding for all requests. This would be useful in coming commits where we would be using pushGetHeadersMsg to fetch block headers from peers. Consequent changes: encoding was removed in the queryJob field as it would not be useful for all requests. Requests that need encoding can define it as one of the fields in its own implementation of the new interface created as a type for the Req field in Request. PriorityIndex function signature would be used in the coming commits to indicate the the priority a request should preferably have in a query batch. An implementaion of the interface was created for GetCfheaders, GetCfilter and getData requests. Tests were updated and added to reflect these changes. Signed-off-by: Maureen Ononiwu <[email protected]>
1 parent 2442639 commit 4dbe2e8

File tree

8 files changed

+192
-28
lines changed

8 files changed

+192
-28
lines changed

blockmanager.go

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package neutrino
55
import (
66
"bytes"
77
"container/list"
8+
"errors"
89
"fmt"
910
"math"
1011
"math/big"
@@ -808,19 +809,39 @@ func (b *blockManager) getUncheckpointedCFHeaders(
808809
// handle a query for checkpointed filter headers.
809810
type checkpointedCFHeadersQuery struct {
810811
blockMgr *blockManager
811-
msgs []wire.Message
812+
msgs []*encodedQuery
812813
checkpoints []*chainhash.Hash
813814
stopHashes map[chainhash.Hash]uint32
814815
headerChan chan *wire.MsgCFHeaders
815816
}
816817

818+
// encodedQuery holds all the information needed to query a message that pushes requests
819+
// using the QueryMessagingWithEncoding method.
820+
type encodedQuery struct {
821+
message wire.Message
822+
encoding wire.MessageEncoding
823+
priorityIndex uint64
824+
}
825+
826+
// Message returns the wire.Message of encodedQuery's struct.
827+
func (e *encodedQuery) Message() wire.Message {
828+
return e.message
829+
}
830+
831+
// PriorityIndex returns the specified priority the caller wants
832+
// the request to take.
833+
func (e *encodedQuery) PriorityIndex() uint64 {
834+
return e.priorityIndex
835+
}
836+
817837
// requests creates the query.Requests for this CF headers query.
818838
func (c *checkpointedCFHeadersQuery) requests() []*query.Request {
819839
reqs := make([]*query.Request, len(c.msgs))
820840
for idx, m := range c.msgs {
821841
reqs[idx] = &query.Request{
822842
Req: m,
823843
HandleResp: c.handleResponse,
844+
SendQuery: sendQueryMessageWithEncoding,
824845
}
825846
}
826847
return reqs
@@ -924,6 +945,24 @@ func (c *checkpointedCFHeadersQuery) handleResponse(req, resp wire.Message,
924945
}
925946
}
926947

948+
// sendQueryMessageWithEncoding sends a message to the peer with encoding.
949+
func sendQueryMessageWithEncoding(peer query.Peer, req query.ReqMessage) error {
950+
sp, ok := peer.(*ServerPeer)
951+
if !ok {
952+
err := "peer is not of type ServerPeer"
953+
log.Errorf(err)
954+
return errors.New(err)
955+
}
956+
request, ok := req.(*encodedQuery)
957+
if !ok {
958+
return errors.New("invalid request type")
959+
}
960+
961+
sp.QueueMessageWithEncoding(request.message, nil, request.encoding)
962+
963+
return nil
964+
}
965+
927966
// getCheckpointedCFHeaders catches a filter header store up with the
928967
// checkpoints we got from the network. It assumes that the filter header store
929968
// matches the checkpoints up to the tip of the store.
@@ -959,7 +998,7 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash,
959998
// the remaining checkpoint intervals.
960999
numCheckpts := uint32(len(checkpoints)) - startingInterval
9611000
numQueries := (numCheckpts + maxCFCheckptsPerQuery - 1) / maxCFCheckptsPerQuery
962-
queryMsgs := make([]wire.Message, 0, numQueries)
1001+
queryMsgs := make([]*encodedQuery, 0, numQueries)
9631002

9641003
// We'll also create an additional set of maps that we'll use to
9651004
// re-order the responses as we get them in.
@@ -1004,9 +1043,12 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash,
10041043

10051044
// Once we have the stop hash, we can construct the query
10061045
// message itself.
1007-
queryMsg := wire.NewMsgGetCFHeaders(
1008-
fType, startHeightRange, &stopHash,
1009-
)
1046+
queryMsg := &encodedQuery{
1047+
message: wire.NewMsgGetCFHeaders(
1048+
fType, startHeightRange, &stopHash,
1049+
),
1050+
encoding: wire.WitnessEncoding,
1051+
}
10101052

10111053
// We'll mark that the ith interval is queried by this message,
10121054
// and also map the stop hash back to the index of this message.

blockmanager_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ func TestBlockManagerInitialInterval(t *testing.T) {
352352

353353
var msgs []wire.Message
354354
for _, q := range requests {
355-
msgs = append(msgs, q.Req)
355+
msgs = append(msgs, q.Req.Message())
356356
}
357357

358358
responses, err := generateResponses(msgs, headers)
@@ -582,7 +582,7 @@ func TestBlockManagerInvalidInterval(t *testing.T) {
582582

583583
var msgs []wire.Message
584584
for _, q := range requests {
585-
msgs = append(msgs, q.Req)
585+
msgs = append(msgs, q.Req.Message())
586586
}
587587
responses, err := generateResponses(msgs, headers)
588588
require.NoError(t, err)

query.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -435,13 +435,17 @@ type cfiltersQuery struct {
435435
// request couples a query message with the handler to be used for the response
436436
// in a query.Request struct.
437437
func (q *cfiltersQuery) request() *query.Request {
438-
msg := wire.NewMsgGetCFilters(
439-
q.filterType, uint32(q.startHeight), q.stopHash,
440-
)
438+
msg := &encodedQuery{
439+
message: wire.NewMsgGetCFilters(
440+
q.filterType, uint32(q.startHeight), q.stopHash,
441+
),
442+
encoding: wire.WitnessEncoding,
443+
}
441444

442445
return &query.Request{
443446
Req: msg,
444447
HandleResp: q.handleResponse,
448+
SendQuery: sendQueryMessageWithEncoding,
445449
}
446450
}
447451

@@ -833,6 +837,10 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,
833837
// Construct the appropriate getdata message to fetch the target block.
834838
getData := wire.NewMsgGetData()
835839
_ = getData.AddInvVect(inv)
840+
msg := &encodedQuery{
841+
message: getData,
842+
encoding: wire.WitnessEncoding,
843+
}
836844

837845
var foundBlock *btcutil.Block
838846

@@ -912,8 +920,9 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,
912920

913921
// Prepare the query request.
914922
request := &query.Request{
915-
Req: getData,
923+
Req: msg,
916924
HandleResp: handleResp,
925+
SendQuery: sendQueryMessageWithEncoding,
917926
}
918927

919928
// Prepare the query options.

query/interface.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ type Progress struct {
125125
// connected peers.
126126
type Request struct {
127127
// Req is the message request to send.
128-
Req wire.Message
128+
Req ReqMessage
129129

130130
// HandleResp is a response handler that will be called for every
131131
// message received from the peer that the request was made to. It
@@ -139,6 +139,15 @@ type Request struct {
139139
// The response should be handed off to another goroutine for
140140
// processing.
141141
HandleResp func(req, resp wire.Message, peer string) Progress
142+
143+
// SendQuery handles sending request to the worker's peer. It returns an error,
144+
// if one is encountered while sending the request.
145+
SendQuery func(peer Peer, request ReqMessage) error
146+
}
147+
148+
type ReqMessage interface {
149+
Message() wire.Message
150+
PriorityIndex() uint64
142151
}
143152

144153
// WorkManager defines an API for a manager that dispatches queries to bitcoin

query/worker.go

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package query
33
import (
44
"errors"
55
"time"
6-
7-
"github.com/btcsuite/btcd/wire"
86
)
97

108
var (
@@ -27,7 +25,6 @@ type queryJob struct {
2725
tries uint8
2826
index uint64
2927
timeout time.Duration
30-
encoding wire.MessageEncoding
3128
cancelChan <-chan struct{}
3229
*Request
3330
}
@@ -89,6 +86,7 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
8986
msgChan, cancel := peer.SubscribeRecvMsg()
9087
defer cancel()
9188

89+
nexJobLoop:
9290
for {
9391
log.Tracef("Worker %v waiting for more work", peer.Addr())
9492

@@ -133,7 +131,22 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
133131
log.Tracef("Worker %v queuing job %T with index %v",
134132
peer.Addr(), job.Req, job.Index())
135133

136-
peer.QueueMessageWithEncoding(job.Req, nil, job.encoding)
134+
err := job.SendQuery(peer, job.Req)
135+
136+
// If any error occurs while sending query send a message to the worker's feedback Chan
137+
// which would be handled by the "LOOP" below.
138+
if err != nil {
139+
select {
140+
case results <- &jobResult{
141+
job: job,
142+
peer: peer,
143+
err: err,
144+
}:
145+
case <-quit:
146+
return
147+
}
148+
goto nexJobLoop
149+
}
137150
}
138151

139152
// Wait for the correct response to be received from the peer,
@@ -143,15 +156,15 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
143156
timeout = time.NewTimer(job.timeout)
144157
)
145158

146-
Loop:
159+
feedbackLoop:
147160
for {
148161
select {
149162
// A message was received from the peer, use the
150163
// response handler to check whether it was answering
151164
// our request.
152165
case resp := <-msgChan:
153166
progress := job.HandleResp(
154-
job.Req, resp, peer.Addr(),
167+
job.Req.Message(), resp, peer.Addr(),
155168
)
156169

157170
log.Tracef("Worker %v handled msg %T while "+
@@ -176,12 +189,12 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
176189
job.timeout,
177190
)
178191
}
179-
continue Loop
192+
continue feedbackLoop
180193
}
181194

182195
// We did get a valid response, and can break
183196
// the loop.
184-
break Loop
197+
break feedbackLoop
185198

186199
// If the timeout is reached before a valid response
187200
// has been received, we exit with an error.
@@ -193,7 +206,7 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
193206
"with job index %v", peer.Addr(),
194207
job.Req, job.Index())
195208

196-
break Loop
209+
break feedbackLoop
197210

198211
// If the peer disconnects before giving us a valid
199212
// answer, we'll also exit with an error.
@@ -203,7 +216,7 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
203216
job.Index())
204217

205218
jobErr = ErrPeerDisconnected
206-
break Loop
219+
break feedbackLoop
207220

208221
// If the job was canceled, we report this back to the
209222
// work manager.
@@ -212,7 +225,7 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
212225
peer.Addr(), job.Index())
213226

214227
jobErr = ErrJobCanceled
215-
break Loop
228+
break feedbackLoop
216229

217230
case <-quit:
218231
return

0 commit comments

Comments
 (0)