Skip to content

Commit 2285ce7

Browse files
committed
neutrino + query: Refactored ErrChan, jobResults, HandleResp, Progress
- Added Unfinished bool to jobResult to indicate successful jobs that still need to send another request to the peer to be considered complete. - Made ErrChan a query option in that way it is optional for different queries. - Refactored HandleResp, peer is now passed as query.Peer instead of using its address. - Changed type for query.Progress. Signed-off-by: Maureen Ononiwu <[email protected]>
1 parent dd02e22 commit 2285ce7

File tree

9 files changed

+683
-252
lines changed

9 files changed

+683
-252
lines changed

blockmanager.go

Lines changed: 16 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -850,43 +850,37 @@ func (c *checkpointedCFHeadersQuery) requests() []*query.Request {
850850

851851
// handleResponse is the internal response handler used for requests for this
852852
// CFHeaders query.
853-
func (c *checkpointedCFHeadersQuery) handleResponse(req, resp wire.Message,
854-
peerAddr string) query.Progress {
853+
func (c *checkpointedCFHeadersQuery) handleResponse(request query.ReqMessage, resp wire.Message,
854+
peer query.Peer) query.Progress {
855+
856+
peerAddr := ""
857+
if peer != nil {
858+
peerAddr = peer.Addr()
859+
}
860+
req := request.Message()
855861

856862
r, ok := resp.(*wire.MsgCFHeaders)
857863
if !ok {
858864
// We are only looking for cfheaders messages.
859-
return query.Progress{
860-
Finished: false,
861-
Progressed: false,
862-
}
865+
return query.NoResponse
863866
}
864867

865868
q, ok := req.(*wire.MsgGetCFHeaders)
866869
if !ok {
867870
// We sent a getcfheaders message, so that's what we should be
868871
// comparing against.
869-
return query.Progress{
870-
Finished: false,
871-
Progressed: false,
872-
}
872+
return query.NoResponse
873873
}
874874

875875
// The response doesn't match the query.
876876
if q.FilterType != r.FilterType || q.StopHash != r.StopHash {
877-
return query.Progress{
878-
Finished: false,
879-
Progressed: false,
880-
}
877+
return query.NoResponse
881878
}
882879

883880
checkPointIndex, ok := c.stopHashes[r.StopHash]
884881
if !ok {
885882
// We never requested a matching stop hash.
886-
return query.Progress{
887-
Finished: false,
888-
Progressed: false,
889-
}
883+
return query.NoResponse
890884
}
891885

892886
// Use either the genesis header or the previous checkpoint index as
@@ -920,10 +914,7 @@ func (c *checkpointedCFHeadersQuery) handleResponse(req, resp wire.Message,
920914
log.Errorf("Unable to ban peer %v: %v", peerAddr, err)
921915
}
922916

923-
return query.Progress{
924-
Finished: false,
925-
Progressed: false,
926-
}
917+
return query.NoResponse
927918
}
928919

929920
// At this point, the response matches the query, and the relevant
@@ -934,16 +925,10 @@ func (c *checkpointedCFHeadersQuery) handleResponse(req, resp wire.Message,
934925
select {
935926
case c.headerChan <- r:
936927
case <-c.blockMgr.quit:
937-
return query.Progress{
938-
Finished: false,
939-
Progressed: false,
940-
}
928+
return query.NoResponse
941929
}
942930

943-
return query.Progress{
944-
Finished: true,
945-
Progressed: true,
946-
}
931+
return query.Finished
947932
}
948933

949934
// sendQueryMessageWithEncoding sends a message to the peer with encoding.
@@ -1106,7 +1091,7 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash,
11061091
// Hand the queries to the work manager, and consume the verified
11071092
// responses as they come back.
11081093
errChan := b.cfg.QueryDispatcher.Query(
1109-
q.requests(), query.Cancel(b.quit), query.NoRetryMax(),
1094+
q.requests(), query.Cancel(b.quit), query.NoRetryMax(), query.ErrChan(make(chan error, 1)),
11101095
)
11111096

11121097
// Keep waiting for more headers as long as we haven't received an

blockmanager_test.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -214,14 +214,14 @@ func generateHeaders(genesisBlockHeader *wire.BlockHeader,
214214

215215
// generateResponses generates the MsgCFHeaders messages from the given queries
216216
// and headers.
217-
func generateResponses(msgs []wire.Message,
217+
func generateResponses(msgs []query.ReqMessage,
218218
headers *headers) ([]*wire.MsgCFHeaders, error) {
219219

220220
// Craft a response for each message.
221221
var responses []*wire.MsgCFHeaders
222222
for _, msg := range msgs {
223223
// Only GetCFHeaders expected.
224-
q, ok := msg.(*wire.MsgGetCFHeaders)
224+
q, ok := msg.Message().(*wire.MsgGetCFHeaders)
225225
if !ok {
226226
return nil, fmt.Errorf("got unexpected message %T",
227227
msg)
@@ -350,9 +350,9 @@ func TestBlockManagerInitialInterval(t *testing.T) {
350350
requests []*query.Request,
351351
options ...query.QueryOption) chan error {
352352

353-
var msgs []wire.Message
353+
var msgs []query.ReqMessage
354354
for _, q := range requests {
355-
msgs = append(msgs, q.Req.Message())
355+
msgs = append(msgs, q.Req)
356356
}
357357

358358
responses, err := generateResponses(msgs, headers)
@@ -379,13 +379,13 @@ func TestBlockManagerInitialInterval(t *testing.T) {
379379
// Let the blockmanager handle the
380380
// message.
381381
progress := requests[index].HandleResp(
382-
msgs[index], &resp, "",
382+
msgs[index], &resp, nil,
383383
)
384384

385-
if !progress.Finished {
385+
if progress != query.Finished {
386386
errChan <- fmt.Errorf("got "+
387-
"response false on "+
388-
"send of index %d: %v",
387+
" %v on "+
388+
"send of index %d: %v", progress,
389389
index, testDesc)
390390
return
391391
}
@@ -400,13 +400,13 @@ func TestBlockManagerInitialInterval(t *testing.T) {
400400
// Otherwise resend the response we
401401
// just sent.
402402
progress = requests[index].HandleResp(
403-
msgs[index], &resp2, "",
403+
msgs[index], &resp2, nil,
404404
)
405-
if !progress.Finished {
405+
if progress != query.Finished {
406406
errChan <- fmt.Errorf("got "+
407-
"response false on "+
408-
"resend of index %d: "+
409-
"%v", index, testDesc)
407+
" %v on "+
408+
"send of index %d: %v", progress,
409+
index, testDesc)
410410
return
411411
}
412412
}
@@ -580,9 +580,9 @@ func TestBlockManagerInvalidInterval(t *testing.T) {
580580
requests []*query.Request,
581581
options ...query.QueryOption) chan error {
582582

583-
var msgs []wire.Message
583+
var msgs []query.ReqMessage
584584
for _, q := range requests {
585-
msgs = append(msgs, q.Req.Message())
585+
msgs = append(msgs, q.Req)
586586
}
587587
responses, err := generateResponses(msgs, headers)
588588
require.NoError(t, err)
@@ -619,10 +619,10 @@ func TestBlockManagerInvalidInterval(t *testing.T) {
619619
// expect.
620620
for i := range responses {
621621
progress := requests[i].HandleResp(
622-
msgs[i], responses[i], "",
622+
msgs[i], responses[i], nil,
623623
)
624624
if i == test.firstInvalid {
625-
if progress.Finished {
625+
if progress == query.Finished {
626626
t.Errorf("expected interval "+
627627
"%d to be invalid", i)
628628
return
@@ -631,7 +631,7 @@ func TestBlockManagerInvalidInterval(t *testing.T) {
631631
break
632632
}
633633

634-
if !progress.Finished {
634+
if progress != query.Finished {
635635
t.Errorf("expected interval %d to be "+
636636
"valid", i)
637637
return

query.go

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,7 @@ var (
6666

6767
// noProgress will be used to indicate to a query.WorkManager that a
6868
// response makes no progress towards the completion of the query.
69-
noProgress = query.Progress{
70-
Finished: false,
71-
Progressed: false,
72-
}
69+
noProgress = query.NoResponse
7370
)
7471

7572
// queries are a set of options that can be modified per-query, unlike global
@@ -470,9 +467,10 @@ func (q *cfiltersQuery) request() *query.Request {
470467

471468
// handleResponse validates that the cfilter response we get from a peer is
472469
// sane given the getcfilter query that we made.
473-
func (q *cfiltersQuery) handleResponse(req, resp wire.Message,
474-
_ string) query.Progress {
470+
func (q *cfiltersQuery) handleResponse(r query.ReqMessage, resp wire.Message,
471+
peer query.Peer) query.Progress {
475472

473+
req := r.Message()
476474
// The request must have been a "getcfilters" msg.
477475
request, ok := req.(*wire.MsgGetCFilters)
478476
if !ok {
@@ -573,17 +571,11 @@ func (q *cfiltersQuery) handleResponse(req, resp wire.Message,
573571
// If there are still entries left in the headerIndex then the query
574572
// has made progress but has not yet completed.
575573
if len(q.headerIndex) != 0 {
576-
return query.Progress{
577-
Finished: false,
578-
Progressed: true,
579-
}
574+
return query.Progressed
580575
}
581576

582577
// The headerIndex is empty and so this query is complete.
583-
return query.Progress{
584-
Finished: true,
585-
Progressed: true,
586-
}
578+
return query.Finished
587579
}
588580

589581
// prepareCFiltersQuery creates a cfiltersQuery that can be used to fetch a
@@ -784,6 +776,7 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash,
784776
query.Cancel(s.quit),
785777
query.Encoding(qo.encoding),
786778
query.NumRetries(qo.numRetries),
779+
query.ErrChan(make(chan error, 1)),
787780
}
788781

789782
errChan := s.workManager.Query(
@@ -868,7 +861,12 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,
868861
// handleResp will be called for each message received from a peer. It
869862
// will be used to signal to the work manager whether progress has been
870863
// made or not.
871-
handleResp := func(req, resp wire.Message, peer string) query.Progress {
864+
handleResp := func(request query.ReqMessage, resp wire.Message, sp query.Peer) query.Progress {
865+
req := request.Message()
866+
peer := ""
867+
if sp != nil {
868+
peer = sp.Addr()
869+
}
872870
// The request must have been a "getdata" msg.
873871
_, ok := req.(*wire.MsgGetData)
874872
if !ok {
@@ -933,10 +931,7 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,
933931
// we declare it sane. We can kill the query and pass the
934932
// response back to the caller.
935933
foundBlock = block
936-
return query.Progress{
937-
Finished: true,
938-
Progressed: true,
939-
}
934+
return query.Finished
940935
}
941936

942937
// Prepare the query request.
@@ -968,6 +963,7 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,
968963
query.Encoding(qo.encoding),
969964
query.NumRetries(qo.numRetries),
970965
query.Cancel(s.quit),
966+
query.ErrChan(make(chan error, 1)),
971967
}
972968

973969
// Send the request to the work manager and await a response.

query/interface.go

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ type queryOptions struct {
4343
// that a query can be retried. If this is set then numRetries has no
4444
// effect.
4545
noRetryMax bool
46+
47+
// errChan error channel with which the workmananger sends error.
48+
errChan chan error
4649
}
4750

4851
// QueryOption is a functional option argument to any of the network query
@@ -67,6 +70,14 @@ func (qo *queryOptions) applyQueryOptions(options ...QueryOption) {
6770
}
6871
}
6972

73+
// ErrChan is a query option that specifies the error channel which the workmanager
74+
// sends any error to.
75+
func ErrChan(err chan error) QueryOption {
76+
return func(qo *queryOptions) {
77+
qo.errChan = err
78+
}
79+
}
80+
7081
// NumRetries is a query option that specifies the number of times a query
7182
// should be retried.
7283
func NumRetries(num uint8) QueryOption {
@@ -107,19 +118,34 @@ func Cancel(cancel chan struct{}) QueryOption {
107118
}
108119
}
109120

110-
// Progress encloses the result of handling a response for a given Request,
111-
// determining whether the response did progress the query.
112-
type Progress struct {
113-
// Finished is true if the query was finished as a result of the
114-
// received response.
115-
Finished bool
116-
117-
// Progressed is true if the query made progress towards fully
118-
// answering the request as a result of the received response. This is
119-
// used for the requests types where more than one response is
120-
// expected.
121-
Progressed bool
122-
}
121+
// Progress encloses the result of handling a response for a given Request.
122+
type Progress string
123+
124+
var (
125+
126+
// Finished indicates we have received the complete, valid response for this request,
127+
// and so we are done with it.
128+
Finished Progress = "Received complete and valid response for request."
129+
130+
// Progressed indicates that we have received a valid response, but we are expecting more.
131+
Progressed Progress = "Received valid response, expecting more response for query."
132+
133+
// UnFinishedRequest indicates that we have received some response, but we need to rescheule the job
134+
// to completely fetch all the response required for this request.
135+
UnFinishedRequest Progress = "Received valid response, reschedule to complete request"
136+
137+
// ResponseErr indicates we obtained a valid response but response fails checks and needs to
138+
// be rescheduled.
139+
ResponseErr Progress = "Received valid response but fails checks "
140+
141+
// IgnoreRequest indicates that we have received a valid response but the workmanager need take
142+
// no action on the result of this job.
143+
IgnoreRequest Progress = "Received response but ignoring"
144+
145+
// NoResponse indicates that we have received an invalid response for this request, and we need
146+
// to wait for a valid one.
147+
NoResponse Progress = "Received invalid response"
148+
)
123149

124150
// Request is the main struct that defines a bitcoin network query to be sent to
125151
// connected peers.
@@ -138,7 +164,7 @@ type Request struct {
138164
// should validate the response and immediately return the progress.
139165
// The response should be handed off to another goroutine for
140166
// processing.
141-
HandleResp func(req, resp wire.Message, peer string) Progress
167+
HandleResp func(req ReqMessage, resp wire.Message, peer Peer) Progress
142168

143169
// SendQuery handles sending request to the worker's peer. It returns an error,
144170
// if one is encountered while sending the request.

0 commit comments

Comments
 (0)