Skip to content

Commit 57c289f

Browse files
committed
neutrino: parallelized block header download.
This commit distributes header download across peers leveraging checckpoints and the workmanager. Signed-off-by: Maureen Ononiwu <[email protected]>
1 parent f1f8897 commit 57c289f

File tree

8 files changed

+1685
-66
lines changed

8 files changed

+1685
-66
lines changed

blockmanager.go

Lines changed: 547 additions & 29 deletions
Large diffs are not rendered by default.

blockmanager_test.go

Lines changed: 932 additions & 23 deletions
Large diffs are not rendered by default.

neutrino.go

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,31 @@ func NewServerPeer(s *ChainService, isPersistent bool) *ServerPeer {
195195
}
196196
}
197197

198+
// IsSyncCandidate returns whether or not the peer is a candidate to consider
199+
// syncing from.
200+
func (sp *ServerPeer) IsSyncCandidate() bool {
201+
// The peer is not a candidate for sync if it's not a full node.
202+
return sp.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork
203+
}
204+
205+
// IsPeerBehindStartHeight returns a boolean indicating if the peer's last block height
206+
// is behind the start height of the request. If the peer is not behind the request start
207+
// height false is returned, otherwise, true is.
208+
func (sp *ServerPeer) IsPeerBehindStartHeight(req query.ReqMessage) bool {
209+
queryGetHeaders, ok := req.(*headerQuery)
210+
211+
if !ok {
212+
log.Debugf("request is not type headerQuery")
213+
214+
return true
215+
}
216+
217+
if sp.LastBlock() < queryGetHeaders.startHeight {
218+
return true
219+
}
220+
return false
221+
}
222+
198223
// newestBlock returns the current best block hash and height using the format
199224
// required by the configuration for the peer package.
200225
func (sp *ServerPeer) newestBlock() (*chainhash.Hash, int32, error) {
@@ -800,15 +825,21 @@ func NewChainService(cfg Config) (*ChainService, error) {
800825
}
801826

802827
bm, err := newBlockManager(&blockManagerCfg{
803-
ChainParams: s.chainParams,
804-
BlockHeaders: s.BlockHeaders,
805-
RegFilterHeaders: s.RegFilterHeaders,
806-
TimeSource: s.timeSource,
807-
QueryDispatcher: s.workManager,
808-
BanPeer: s.BanPeer,
809-
GetBlock: s.GetBlock,
810-
firstPeerSignal: s.firstPeerConnect,
811-
queryAllPeers: s.queryAllPeers,
828+
ChainParams: s.chainParams,
829+
BlockHeaders: s.BlockHeaders,
830+
RegFilterHeaders: s.RegFilterHeaders,
831+
TimeSource: s.timeSource,
832+
cfHeaderQueryDispatcher: s.workManager,
833+
BanPeer: s.BanPeer,
834+
GetBlock: s.GetBlock,
835+
firstPeerSignal: s.firstPeerConnect,
836+
queryAllPeers: s.queryAllPeers,
837+
blkHdrCheckptQueryDispatcher: query.NewWorkManager(&query.Config{
838+
ConnectedPeers: s.ConnectedPeers,
839+
NewWorker: query.NewWorker,
840+
Ranking: query.NewPeerRanking(),
841+
IsEligibleWorkerFunc: query.IsWorkerEligibleForBlkHdrFetch,
842+
}),
812843
})
813844
if err != nil {
814845
return nil, err
@@ -1610,6 +1641,9 @@ func (s *ChainService) Start() error {
16101641
s.addrManager.Start()
16111642
s.blockManager.Start()
16121643
s.blockSubscriptionMgr.Start()
1644+
if err := s.blockManager.cfg.blkHdrCheckptQueryDispatcher.Start(); err != nil {
1645+
return fmt.Errorf("unable to start block header work manager: %v", err)
1646+
}
16131647
if err := s.workManager.Start(); err != nil {
16141648
return fmt.Errorf("unable to start work manager: %v", err)
16151649
}

query/interface.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,4 +202,11 @@ type Peer interface {
202202
// OnDisconnect returns a channel that will be closed when this peer is
203203
// disconnected.
204204
OnDisconnect() <-chan struct{}
205+
206+
// IsPeerBehindStartHeight returns a boolean indicating if the peer's known last height is behind
207+
// the request's start Height which it receives as an argument.
208+
IsPeerBehindStartHeight(req ReqMessage) bool
209+
210+
// IsSyncCandidate returns if the peer is a sync candidate.
211+
IsSyncCandidate() bool
205212
}

query/worker.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
9494
msgChan, cancel := peer.SubscribeRecvMsg()
9595
defer cancel()
9696

97-
nexJobLoop:
97+
nextJobLoop:
9898
for {
9999
log.Tracef("Worker %v waiting for more work", peer.Addr())
100100

@@ -153,7 +153,7 @@ nexJobLoop:
153153
case <-quit:
154154
return
155155
}
156-
goto nexJobLoop
156+
goto nextJobLoop
157157
}
158158
}
159159

@@ -302,6 +302,28 @@ nexJobLoop:
302302
}
303303
}
304304

305+
func (w *worker) IsSyncCandidate() bool {
306+
return w.peer.IsSyncCandidate()
307+
}
308+
309+
func (w *worker) IsPeerBehindStartHeight(req ReqMessage) bool {
310+
return w.peer.IsPeerBehindStartHeight(req)
311+
}
312+
313+
// IsWorkerEligibleForBlkHdrFetch is the eligibility function used for the BlockHdrWorkManager to determine workers
314+
// eligible to receive jobs (the job is to fetch headers). If the peer is not a sync candidate or if its last known
315+
// block height is behind the job query's start height, it returns false. Otherwise, it returns true.
316+
func IsWorkerEligibleForBlkHdrFetch(r *activeWorker, next *queryJob) bool {
317+
if !r.w.IsSyncCandidate() {
318+
return false
319+
}
320+
321+
if r.w.IsPeerBehindStartHeight(next.Req) {
322+
return false
323+
}
324+
return true
325+
}
326+
305327
// NewJob returns a channel where work that is to be handled by the worker can
306328
// be sent. If the worker reads a queryJob from this channel, it is guaranteed
307329
// that a response will eventually be deliverd on the results channel (except

query/worker_test.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ import (
1010
)
1111

1212
type mockQueryEncoded struct {
13-
message *wire.MsgGetData
14-
encoding wire.MessageEncoding
15-
index float64
13+
message *wire.MsgGetData
14+
encoding wire.MessageEncoding
15+
index float64
16+
startHeight int
1617
}
1718

1819
func (m *mockQueryEncoded) Message() wire.Message {
@@ -49,6 +50,8 @@ type mockPeer struct {
4950
responses chan<- wire.Message
5051
subscriptions chan chan wire.Message
5152
quit chan struct{}
53+
bestHeight int
54+
fullNode bool
5255
err error
5356
}
5457

@@ -69,6 +72,15 @@ func (m *mockPeer) Addr() string {
6972
return m.addr
7073
}
7174

75+
func (m *mockPeer) IsPeerBehindStartHeight(request ReqMessage) bool {
76+
r := request.(*mockQueryEncoded)
77+
return m.bestHeight < r.startHeight
78+
}
79+
80+
func (m *mockPeer) IsSyncCandidate() bool {
81+
return m.fullNode
82+
}
83+
7284
// makeJob returns a new query job that will be done when it is given the
7385
// finalResp message. Similarly ot will progress on being given the
7486
// progressResp message, while any other message will be ignored.

query/workmanager.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@ type Worker interface {
4646
// delivered on the results channel (except when the quit channel has
4747
// been closed).
4848
NewJob() chan<- *queryJob
49+
50+
// IsPeerBehindStartHeight returns a boolean indicating if the peer's known last height is behind
51+
// the request's start Height which it receives as an argument.
52+
IsPeerBehindStartHeight(req ReqMessage) bool
53+
54+
// IsSyncCandidate returns if the peer is a sync candidate.
55+
IsSyncCandidate() bool
4956
}
5057

5158
// PeerRanking is an interface that must be satisfied by the underlying module

query/workmanager_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@ type mockWorker struct {
1515
results chan *jobResult
1616
}
1717

18+
func (m *mockWorker) IsPeerBehindStartHeight(req ReqMessage) bool {
19+
return m.peer.IsPeerBehindStartHeight(req)
20+
}
21+
22+
func (m *mockWorker) IsSyncCandidate() bool {
23+
return m.peer.IsSyncCandidate()
24+
}
25+
1826
var _ Worker = (*mockWorker)(nil)
1927

2028
func (m *mockWorker) NewJob() chan<- *queryJob {
@@ -977,3 +985,105 @@ func TestWorkManagerResultUnfinished(t *testing.T) {
977985
t.Fatalf("nothing received on errChan")
978986
}
979987
}
988+
989+
// TestIsWorkerEligibleForBlkHdrFetch tests the IsWorkerEligibleForBlkHdrFetch function.
990+
func TestIsWorkerEligibleForBlkHdrFetch(t *testing.T) {
991+
type testArgs struct {
992+
name string
993+
activeWorker *activeWorker
994+
job *queryJob
995+
expectedEligibility bool
996+
}
997+
998+
testCases := []testArgs{
999+
{
1000+
name: "peer sync candidate, best height behind job start Height",
1001+
activeWorker: &activeWorker{
1002+
w: &mockWorker{
1003+
peer: &mockPeer{
1004+
bestHeight: 5,
1005+
fullNode: true,
1006+
},
1007+
},
1008+
},
1009+
job: &queryJob{
1010+
Request: &Request{
1011+
Req: &mockQueryEncoded{
1012+
startHeight: 10,
1013+
},
1014+
},
1015+
},
1016+
expectedEligibility: false,
1017+
},
1018+
1019+
{
1020+
name: "peer sync candidate, best height ahead job start Height",
1021+
activeWorker: &activeWorker{
1022+
w: &mockWorker{
1023+
peer: &mockPeer{
1024+
bestHeight: 10,
1025+
fullNode: true,
1026+
},
1027+
},
1028+
},
1029+
job: &queryJob{
1030+
Request: &Request{
1031+
Req: &mockQueryEncoded{
1032+
startHeight: 5,
1033+
},
1034+
},
1035+
},
1036+
expectedEligibility: true,
1037+
},
1038+
1039+
{
1040+
name: "peer not sync candidate, best height behind job start Height",
1041+
activeWorker: &activeWorker{
1042+
w: &mockWorker{
1043+
peer: &mockPeer{
1044+
bestHeight: 5,
1045+
fullNode: false,
1046+
},
1047+
},
1048+
},
1049+
job: &queryJob{
1050+
Request: &Request{
1051+
Req: &mockQueryEncoded{
1052+
startHeight: 10,
1053+
},
1054+
},
1055+
},
1056+
expectedEligibility: false,
1057+
},
1058+
1059+
{
1060+
name: "peer not sync candidate, best height ahead job start Height",
1061+
activeWorker: &activeWorker{
1062+
w: &mockWorker{
1063+
peer: &mockPeer{
1064+
bestHeight: 10,
1065+
fullNode: false,
1066+
},
1067+
},
1068+
},
1069+
job: &queryJob{
1070+
Request: &Request{
1071+
Req: &mockQueryEncoded{
1072+
startHeight: 5,
1073+
},
1074+
},
1075+
},
1076+
expectedEligibility: false,
1077+
},
1078+
}
1079+
1080+
for _, test := range testCases {
1081+
t.Run(test.name, func(t *testing.T) {
1082+
isEligible := IsWorkerEligibleForBlkHdrFetch(test.activeWorker, test.job)
1083+
if isEligible != test.expectedEligibility {
1084+
t.Fatalf("Expected '%v'for eligibility check but got"+
1085+
"'%v'\n", test.expectedEligibility, isEligible)
1086+
}
1087+
})
1088+
}
1089+
}

0 commit comments

Comments
 (0)