Skip to content

Commit 64b2787

Browse files
committed
neutrino: parallelized block header download.
This commit distributes header download across peers leveraging checckpoints and the workmanager. Signed-off-by: Maureen Ononiwu <[email protected]>
1 parent 2285ce7 commit 64b2787

File tree

8 files changed

+1612
-66
lines changed

8 files changed

+1612
-66
lines changed

blockmanager.go

Lines changed: 516 additions & 29 deletions
Large diffs are not rendered by default.

blockmanager_test.go

Lines changed: 890 additions & 23 deletions
Large diffs are not rendered by default.

neutrino.go

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,31 @@ func NewServerPeer(s *ChainService, isPersistent bool) *ServerPeer {
195195
}
196196
}
197197

198+
// IsSyncCandidate returns whether or not the peer is a candidate to consider
199+
// syncing from.
200+
func (sp *ServerPeer) IsSyncCandidate() bool {
201+
// The peer is not a candidate for sync if it's not a full node.
202+
return sp.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork
203+
}
204+
205+
// IsPeerBehindStartHeight returns a boolean indicating if the peer's last block height
206+
// is behind the start height of the request. If the peer is not behind the request start
207+
// height false is returned, otherwise, true is.
208+
func (sp *ServerPeer) IsPeerBehindStartHeight(req query.ReqMessage) bool {
209+
queryGetHeaders, ok := req.(*headerQuery)
210+
211+
if !ok {
212+
log.Debugf("request is not type headerQuery")
213+
214+
return true
215+
}
216+
217+
if sp.LastBlock() < queryGetHeaders.startHeight {
218+
return true
219+
}
220+
return false
221+
}
222+
198223
// newestBlock returns the current best block hash and height using the format
199224
// required by the configuration for the peer package.
200225
func (sp *ServerPeer) newestBlock() (*chainhash.Hash, int32, error) {
@@ -800,15 +825,21 @@ func NewChainService(cfg Config) (*ChainService, error) {
800825
}
801826

802827
bm, err := newBlockManager(&blockManagerCfg{
803-
ChainParams: s.chainParams,
804-
BlockHeaders: s.BlockHeaders,
805-
RegFilterHeaders: s.RegFilterHeaders,
806-
TimeSource: s.timeSource,
807-
QueryDispatcher: s.workManager,
808-
BanPeer: s.BanPeer,
809-
GetBlock: s.GetBlock,
810-
firstPeerSignal: s.firstPeerConnect,
811-
queryAllPeers: s.queryAllPeers,
828+
ChainParams: s.chainParams,
829+
BlockHeaders: s.BlockHeaders,
830+
RegFilterHeaders: s.RegFilterHeaders,
831+
TimeSource: s.timeSource,
832+
cfHeaderQueryDispatcher: s.workManager,
833+
BanPeer: s.BanPeer,
834+
GetBlock: s.GetBlock,
835+
firstPeerSignal: s.firstPeerConnect,
836+
queryAllPeers: s.queryAllPeers,
837+
blkHdrCheckptQueryDispatcher: query.NewWorkManager(&query.Config{
838+
ConnectedPeers: s.ConnectedPeers,
839+
NewWorker: query.NewWorker,
840+
Ranking: query.NewPeerRanking(),
841+
IsEligibleWorkerFunc: query.IsWorkerEligibleForBlkHdrFetch,
842+
}),
812843
})
813844
if err != nil {
814845
return nil, err
@@ -1610,6 +1641,9 @@ func (s *ChainService) Start() error {
16101641
s.addrManager.Start()
16111642
s.blockManager.Start()
16121643
s.blockSubscriptionMgr.Start()
1644+
if err := s.blockManager.cfg.blkHdrCheckptQueryDispatcher.Start(); err != nil {
1645+
return fmt.Errorf("unable to start block header work manager: %v", err)
1646+
}
16131647
if err := s.workManager.Start(); err != nil {
16141648
return fmt.Errorf("unable to start work manager: %v", err)
16151649
}

query/interface.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,4 +224,11 @@ type Peer interface {
224224
// OnDisconnect returns a channel that will be closed when this peer is
225225
// disconnected.
226226
OnDisconnect() <-chan struct{}
227+
228+
// IsPeerBehindStartHeight returns a boolean indicating if the peer's known last height is behind
229+
// the request's start Height which it receives as an argument.
230+
IsPeerBehindStartHeight(req ReqMessage) bool
231+
232+
// IsSyncCandidate returns true if the peer is a sync candidate.
233+
IsSyncCandidate() bool
227234
}

query/worker.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
9595
msgChan, cancel := peer.SubscribeRecvMsg()
9696
defer cancel()
9797

98-
nexJobLoop:
98+
nextJobLoop:
9999
for {
100100
log.Tracef("Worker %v waiting for more work", peer.Addr())
101101

@@ -154,7 +154,7 @@ nexJobLoop:
154154
case <-quit:
155155
return
156156
}
157-
goto nexJobLoop
157+
goto nextJobLoop
158158
}
159159
}
160160

@@ -308,6 +308,28 @@ nexJobLoop:
308308
}
309309
}
310310

311+
func (w *worker) IsSyncCandidate() bool {
312+
return w.peer.IsSyncCandidate()
313+
}
314+
315+
func (w *worker) IsPeerBehindStartHeight(req ReqMessage) bool {
316+
return w.peer.IsPeerBehindStartHeight(req)
317+
}
318+
319+
// IsWorkerEligibleForBlkHdrFetch is the eligibility function used for the BlockHdrWorkManager to determine workers
320+
// eligible to receive jobs (the job is to fetch headers). If the peer is not a sync candidate or if its last known
321+
// block height is behind the job query's start height, it returns false. Otherwise, it returns true.
322+
func IsWorkerEligibleForBlkHdrFetch(r *activeWorker, next *queryJob) bool {
323+
if !r.w.IsSyncCandidate() {
324+
return false
325+
}
326+
327+
if r.w.IsPeerBehindStartHeight(next.Req) {
328+
return false
329+
}
330+
return true
331+
}
332+
311333
// NewJob returns a channel where work that is to be handled by the worker can
312334
// be sent. If the worker reads a queryJob from this channel, it is guaranteed
313335
// that a response will eventually be deliverd on the results channel (except

query/worker_test.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ import (
1010
)
1111

1212
type mockQueryEncoded struct {
13-
message *wire.MsgGetData
14-
encoding wire.MessageEncoding
15-
index float64
13+
message *wire.MsgGetData
14+
encoding wire.MessageEncoding
15+
index float64
16+
startHeight int
1617
}
1718

1819
func (m *mockQueryEncoded) Message() wire.Message {
@@ -52,6 +53,8 @@ type mockPeer struct {
5253
responses chan<- wire.Message
5354
subscriptions chan chan wire.Message
5455
quit chan struct{}
56+
bestHeight int
57+
fullNode bool
5558
err error
5659
}
5760

@@ -72,6 +75,15 @@ func (m *mockPeer) Addr() string {
7275
return m.addr
7376
}
7477

78+
func (m *mockPeer) IsPeerBehindStartHeight(request ReqMessage) bool {
79+
r := request.(*mockQueryEncoded)
80+
return m.bestHeight < r.startHeight
81+
}
82+
83+
func (m *mockPeer) IsSyncCandidate() bool {
84+
return m.fullNode
85+
}
86+
7587
// makeJob returns a new query job that will be done when it is given the
7688
// finalResp message. Similarly ot will progress on being given the
7789
// progressResp message, while any other message will be ignored.

query/workmanager.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@ type Worker interface {
4646
// delivered on the results channel (except when the quit channel has
4747
// been closed).
4848
NewJob() chan<- *queryJob
49+
50+
// IsPeerBehindStartHeight returns a boolean indicating if the peer's known last height is behind
51+
// the request's start Height which it receives as an argument.
52+
IsPeerBehindStartHeight(req ReqMessage) bool
53+
54+
// IsSyncCandidate returns if the peer is a sync candidate.
55+
IsSyncCandidate() bool
4956
}
5057

5158
// PeerRanking is an interface that must be satisfied by the underlying module

query/workmanager_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@ type mockWorker struct {
1515
results chan *jobResult
1616
}
1717

18+
func (m *mockWorker) IsPeerBehindStartHeight(req ReqMessage) bool {
19+
return m.peer.IsPeerBehindStartHeight(req)
20+
}
21+
22+
func (m *mockWorker) IsSyncCandidate() bool {
23+
return m.peer.IsSyncCandidate()
24+
}
25+
1826
var _ Worker = (*mockWorker)(nil)
1927

2028
func (m *mockWorker) NewJob() chan<- *queryJob {
@@ -985,3 +993,105 @@ func TestWorkManagerResultUnfinished(t *testing.T) {
985993
t.Fatalf("nothing received on errChan")
986994
}
987995
}
996+
997+
// TestIsWorkerEligibleForBlkHdrFetch tests the IsWorkerEligibleForBlkHdrFetch function.
998+
func TestIsWorkerEligibleForBlkHdrFetch(t *testing.T) {
999+
type testArgs struct {
1000+
name string
1001+
activeWorker *activeWorker
1002+
job *queryJob
1003+
expectedEligibility bool
1004+
}
1005+
1006+
testCases := []testArgs{
1007+
{
1008+
name: "peer sync candidate, best height behind job start Height",
1009+
activeWorker: &activeWorker{
1010+
w: &mockWorker{
1011+
peer: &mockPeer{
1012+
bestHeight: 5,
1013+
fullNode: true,
1014+
},
1015+
},
1016+
},
1017+
job: &queryJob{
1018+
Request: &Request{
1019+
Req: &mockQueryEncoded{
1020+
startHeight: 10,
1021+
},
1022+
},
1023+
},
1024+
expectedEligibility: false,
1025+
},
1026+
1027+
{
1028+
name: "peer sync candidate, best height ahead job start Height",
1029+
activeWorker: &activeWorker{
1030+
w: &mockWorker{
1031+
peer: &mockPeer{
1032+
bestHeight: 10,
1033+
fullNode: true,
1034+
},
1035+
},
1036+
},
1037+
job: &queryJob{
1038+
Request: &Request{
1039+
Req: &mockQueryEncoded{
1040+
startHeight: 5,
1041+
},
1042+
},
1043+
},
1044+
expectedEligibility: true,
1045+
},
1046+
1047+
{
1048+
name: "peer not sync candidate, best height behind job start Height",
1049+
activeWorker: &activeWorker{
1050+
w: &mockWorker{
1051+
peer: &mockPeer{
1052+
bestHeight: 5,
1053+
fullNode: false,
1054+
},
1055+
},
1056+
},
1057+
job: &queryJob{
1058+
Request: &Request{
1059+
Req: &mockQueryEncoded{
1060+
startHeight: 10,
1061+
},
1062+
},
1063+
},
1064+
expectedEligibility: false,
1065+
},
1066+
1067+
{
1068+
name: "peer not sync candidate, best height ahead job start Height",
1069+
activeWorker: &activeWorker{
1070+
w: &mockWorker{
1071+
peer: &mockPeer{
1072+
bestHeight: 10,
1073+
fullNode: false,
1074+
},
1075+
},
1076+
},
1077+
job: &queryJob{
1078+
Request: &Request{
1079+
Req: &mockQueryEncoded{
1080+
startHeight: 5,
1081+
},
1082+
},
1083+
},
1084+
expectedEligibility: false,
1085+
},
1086+
}
1087+
1088+
for _, test := range testCases {
1089+
t.Run(test.name, func(t *testing.T) {
1090+
isEligible := IsWorkerEligibleForBlkHdrFetch(test.activeWorker, test.job)
1091+
if isEligible != test.expectedEligibility {
1092+
t.Fatalf("Expected '%v'for eligibility check but got"+
1093+
"'%v'\n", test.expectedEligibility, isEligible)
1094+
}
1095+
})
1096+
}
1097+
}

0 commit comments

Comments
 (0)