Skip to content

Commit 9b15992

Browse files
committed
parellized header download in block header checkpointed region
Signed-off-by: Maureen Ononiwu <[email protected]>
1 parent 42a196f commit 9b15992

File tree

12 files changed

+2952
-358
lines changed

12 files changed

+2952
-358
lines changed

blockmanager.go

Lines changed: 633 additions & 60 deletions
Large diffs are not rendered by default.

blockmanager_test.go

Lines changed: 927 additions & 29 deletions
Large diffs are not rendered by default.

neutrino.go

Lines changed: 79 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -180,21 +180,43 @@ type ServerPeer struct {
180180
recvSubscribers map[spMsgSubscription]struct{}
181181
recvSubscribers2 map[msgSubscription]struct{}
182182
mtxSubscribers sync.RWMutex
183+
184+
//recentReqDuration is the time between sending a request and receiving a response
185+
//while querying this peer using the workmanager.
186+
recentReqDuration time.Duration
187+
188+
//recentReqStartTime is the start time of the most recent request sent to this peer while
189+
//using the workmanager to orchestrate the query.
190+
recentReqStartTime time.Time
183191
}
184192

185193
// NewServerPeer returns a new ServerPeer instance. The peer needs to be set by
186194
// the caller.
187195
func NewServerPeer(s *ChainService, isPersistent bool) *ServerPeer {
188196
return &ServerPeer{
189-
server: s,
190-
persistent: isPersistent,
191-
knownAddresses: lru.NewCache[string, *cachedAddr](5000),
192-
quit: make(chan struct{}),
193-
recvSubscribers: make(map[spMsgSubscription]struct{}),
194-
recvSubscribers2: make(map[msgSubscription]struct{}),
197+
server: s,
198+
persistent: isPersistent,
199+
knownAddresses: lru.NewCache[string, *cachedAddr](5000),
200+
quit: make(chan struct{}),
201+
recvSubscribers: make(map[spMsgSubscription]struct{}),
202+
recvSubscribers2: make(map[msgSubscription]struct{}),
203+
recentReqDuration: 2 * time.Second,
195204
}
196205
}
197206

207+
// LastReqDuration returns the most recent request duaration of the peer.
208+
func (sp *ServerPeer) LastReqDuration() time.Duration {
209+
return sp.recentReqDuration
210+
}
211+
212+
// UpdateRequestDuration computes the time between the peer's most recent request's start time and now.
213+
// The duration is assigned to the peer's recentReqDuration field.
214+
func (sp *ServerPeer) UpdateRequestDuration() {
215+
duration := time.Since(sp.recentReqStartTime)
216+
sp.recentReqDuration = duration
217+
log.Debugf("peer=%v, updated duration to=%v", sp.Addr(), duration.Seconds())
218+
}
219+
198220
// newestBlock returns the current best block hash and height using the format
199221
// required by the configuration for the peer package.
200222
func (sp *ServerPeer) newestBlock() (*chainhash.Hash, int32, error) {
@@ -742,6 +764,8 @@ func NewChainService(cfg Config) (*ChainService, error) {
742764
ConnectedPeers: s.ConnectedPeers,
743765
NewWorker: query.NewWorker,
744766
Ranking: query.NewPeerRanking(),
767+
OrderPeers: query.SortPeersByReqDuration,
768+
DebugName: "GeneralWorkManager",
745769
})
746770

747771
var err error
@@ -800,15 +824,24 @@ func NewChainService(cfg Config) (*ChainService, error) {
800824
}
801825

802826
bm, err := newBlockManager(&blockManagerCfg{
803-
ChainParams: s.chainParams,
804-
BlockHeaders: s.BlockHeaders,
805-
RegFilterHeaders: s.RegFilterHeaders,
806-
TimeSource: s.timeSource,
807-
QueryDispatcher: s.workManager,
808-
BanPeer: s.BanPeer,
809-
GetBlock: s.GetBlock,
810-
firstPeerSignal: s.firstPeerConnect,
811-
queryAllPeers: s.queryAllPeers,
827+
ChainParams: s.chainParams,
828+
BlockHeaders: s.BlockHeaders,
829+
RegFilterHeaders: s.RegFilterHeaders,
830+
TimeSource: s.timeSource,
831+
cfHeaderQueryDispatcher: s.workManager,
832+
BanPeer: s.BanPeer,
833+
GetBlock: s.GetBlock,
834+
firstPeerSignal: s.firstPeerConnect,
835+
queryAllPeers: s.queryAllPeers,
836+
peerByAddr: s.PeerByAddr,
837+
blkHdrCheckptQueryDispatcher: query.NewWorkManager(&query.Config{
838+
ConnectedPeers: s.ConnectedPeers,
839+
NewWorker: query.NewWorker,
840+
Ranking: query.NewPeerRanking(),
841+
OrderPeers: query.SortPeersByReqDuration,
842+
IsEligibleWorkerFunc: query.IsWorkerEligibleForBlkHdrFetch,
843+
DebugName: "BlockHdrWorkManager",
844+
}),
812845
})
813846
if err != nil {
814847
return nil, err
@@ -1114,6 +1147,34 @@ func (s *ChainService) AddPeer(sp *ServerPeer) {
11141147
}
11151148
}
11161149

1150+
// IsSyncCandidate returns whether or not the peer is a candidate to consider
1151+
// syncing from.
1152+
func (sp *ServerPeer) IsSyncCandidate() bool {
1153+
// The peer is not a candidate for sync if it's not a full node.
1154+
return sp.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork
1155+
}
1156+
1157+
// IsPeerBehindStartHeight returns a boolean indicating if the peer's last block height
1158+
// is behind the start height of the request. If the peer is not behind the request start
1159+
// height false is returned, otherwise, true is.
1160+
func (sp *ServerPeer) IsPeerBehindStartHeight(req wire.Message) bool {
1161+
queryGetHeaders, ok := req.(*headerQuery)
1162+
1163+
if !ok {
1164+
log.Debugf("request is not type headerQuery")
1165+
1166+
return true
1167+
}
1168+
1169+
if sp.LastBlock() < queryGetHeaders.startHeight {
1170+
1171+
return true
1172+
1173+
}
1174+
1175+
return false
1176+
}
1177+
11171178
// AddBytesSent adds the passed number of bytes to the total bytes sent counter
11181179
// for the server. It is safe for concurrent access.
11191180
func (s *ChainService) AddBytesSent(bytesSent uint64) {
@@ -1609,6 +1670,9 @@ func (s *ChainService) Start() error {
16091670
// needed by peers.
16101671
s.addrManager.Start()
16111672
s.blockManager.Start()
1673+
if err := s.blockManager.cfg.blkHdrCheckptQueryDispatcher.Start(); err != nil {
1674+
return fmt.Errorf("unable to start block header work manager: %v", err)
1675+
}
16121676
s.blockSubscriptionMgr.Start()
16131677
if err := s.workManager.Start(); err != nil {
16141678
return fmt.Errorf("unable to start work manager: %v", err)

query.go

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -439,16 +439,33 @@ func (q *cfiltersQuery) request() *query.Request {
439439
q.filterType, uint32(q.startHeight), q.stopHash,
440440
)
441441

442+
queryMsg := query.ReqMessage{
443+
Message: msg,
444+
}
442445
return &query.Request{
443-
Req: msg,
446+
Req: &queryMsg,
444447
HandleResp: q.handleResponse,
448+
SendQuery: sendQueryMessageWithEncoding,
449+
CloneReq: func(req query.ReqMessage) *query.ReqMessage {
450+
oldReq, ok := req.Message.(*wire.MsgGetCFilters)
451+
if !ok {
452+
log.Errorf("request not of type *wire.MsgCFHeaders")
453+
}
454+
newReq := &query.ReqMessage{
455+
Message: wire.NewMsgGetCFilters(
456+
oldReq.FilterType, oldReq.StartHeight, &oldReq.StopHash,
457+
),
458+
PriorityIndex: req.PriorityIndex,
459+
}
460+
return newReq
461+
},
445462
}
446463
}
447464

448465
// handleResponse validates that the cfilter response we get from a peer is
449466
// sane given the getcfilter query that we made.
450467
func (q *cfiltersQuery) handleResponse(req, resp wire.Message,
451-
_ string) query.Progress {
468+
peer query.Peer, _ *error) query.Progress {
452469

453470
// The request must have been a "getcfilters" msg.
454471
request, ok := req.(*wire.MsgGetCFilters)
@@ -462,6 +479,9 @@ func (q *cfiltersQuery) handleResponse(req, resp wire.Message,
462479
return noProgress
463480
}
464481

482+
if peer != nil {
483+
peer.UpdateRequestDuration()
484+
}
465485
// If the request filter type doesn't match the type we were expecting,
466486
// ignore this message.
467487
if q.filterType != request.FilterType {
@@ -691,6 +711,7 @@ func (s *ChainService) prepareCFiltersQuery(blockHash chainhash.Hash,
691711
func (s *ChainService) GetCFilter(blockHash chainhash.Hash,
692712
filterType wire.FilterType, options ...QueryOption) (*gcs.Filter,
693713
error) {
714+
log.Debugf("Getting CFilter...")
694715

695716
// The only supported filter atm is the regular filter, so we'll reject
696717
// all other filters.
@@ -759,6 +780,7 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash,
759780
query.Cancel(s.quit),
760781
query.Encoding(qo.encoding),
761782
query.NumRetries(qo.numRetries),
783+
query.ErrChan(make(chan error, 1)),
762784
}
763785

764786
errChan := s.workManager.Query(
@@ -775,6 +797,8 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash,
775797
return nil, ErrShuttingDown
776798
}
777799

800+
log.Debugf("Gotten CFilter...%v", err)
801+
778802
// If there are elements left to receive, the query failed.
779803
if len(filterQuery.headerIndex) > 0 {
780804
numFilters := filterQuery.stopHeight -
@@ -834,12 +858,20 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,
834858
getData := wire.NewMsgGetData()
835859
_ = getData.AddInvVect(inv)
836860

861+
msg := query.ReqMessage{
862+
Message: getData,
863+
}
837864
var foundBlock *btcutil.Block
838865

839866
// handleResp will be called for each message received from a peer. It
840867
// will be used to signal to the work manager whether progress has been
841868
// made or not.
842-
handleResp := func(req, resp wire.Message, peer string) query.Progress {
869+
handleResp := func(req, resp wire.Message, sp query.Peer, _ *error) query.Progress {
870+
871+
peer := ""
872+
if sp != nil {
873+
peer = sp.Addr()
874+
}
843875
// The request must have been a "getdata" msg.
844876
_, ok := req.(*wire.MsgGetData)
845877
if !ok {
@@ -852,6 +884,9 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,
852884
return noProgress
853885
}
854886

887+
if sp != nil {
888+
sp.UpdateRequestDuration()
889+
}
855890
// If this isn't the block we asked for, ignore it.
856891
if response.BlockHash() != blockHash {
857892
return noProgress
@@ -912,15 +947,28 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,
912947

913948
// Prepare the query request.
914949
request := &query.Request{
915-
Req: getData,
950+
Req: &msg,
916951
HandleResp: handleResp,
952+
SendQuery: sendQueryMessageWithEncoding,
953+
CloneReq: func(req query.ReqMessage) *query.ReqMessage {
954+
newMsg := wire.NewMsgGetData()
955+
_ = newMsg.AddInvVect(inv)
956+
957+
newReq := &query.ReqMessage{
958+
Message: newMsg,
959+
PriorityIndex: req.PriorityIndex,
960+
}
961+
962+
return newReq
963+
},
917964
}
918965

919966
// Prepare the query options.
920967
queryOpts := []query.QueryOption{
921968
query.Encoding(qo.encoding),
922969
query.NumRetries(qo.numRetries),
923970
query.Cancel(s.quit),
971+
query.ErrChan(make(chan error, 1)),
924972
}
925973

926974
// Send the request to the work manager and await a response.
@@ -934,6 +982,7 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,
934982
return nil, ErrShuttingDown
935983
}
936984

985+
log.Debugf("Gotten block, %v", err)
937986
if foundBlock == nil {
938987
return nil, fmt.Errorf("couldn't retrieve block %s from "+
939988
"network", blockHash)

query/interface.go

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,15 @@ type queryOptions struct {
4343
// that a query can be retried. If this is set then numRetries has no
4444
// effect.
4545
noRetryMax bool
46+
47+
//keepBatch indicates if to delete batch after batch is done. We might need to the keep
48+
//tha batch it case of fetching checkpointed block headers where we could be done fetching
49+
//headers but we would need to keep the batch as some queries might fail verification and
50+
//we need to reschedule the job.
51+
keepBatch bool
52+
53+
//errChan error channel with which the workmananger sends error.
54+
errChan chan error
4655
}
4756

4857
// QueryOption is a functional option argument to any of the network query
@@ -75,6 +84,14 @@ func NumRetries(num uint8) QueryOption {
7584
}
7685
}
7786

87+
// KeepBatch is a query option that specifies if to keep batch after
88+
// query is done.
89+
func KeepBatch() QueryOption {
90+
return func(qo *queryOptions) {
91+
qo.keepBatch = true
92+
}
93+
}
94+
7895
// NoRetryMax is a query option that can be used to disable the cap on the
7996
// number of retries. If this is set then NumRetries has no effect.
8097
func NoRetryMax() QueryOption {
@@ -83,6 +100,14 @@ func NoRetryMax() QueryOption {
83100
}
84101
}
85102

103+
// ErrChan is a query option that specifies the error channel which the workmanager
104+
// sends any error to.
105+
func ErrChan(error chan error) QueryOption {
106+
return func(qo *queryOptions) {
107+
qo.errChan = error
108+
}
109+
}
110+
86111
// Timeout is a query option that specifies the total time a query is allowed
87112
// to be tried before it is failed.
88113
func Timeout(timeout time.Duration) QueryOption {
@@ -107,7 +132,7 @@ func Cancel(cancel chan struct{}) QueryOption {
107132
}
108133
}
109134

110-
// Progress encloses the result of handling a response for a given Request,
135+
// Progress encloses the result of handling a response for a given Request,
111136
// determining whether the response did progress the query.
112137
type Progress struct {
113138
// Finished is true if the query was finished as a result of the
@@ -125,7 +150,7 @@ type Progress struct {
125150
// connected peers.
126151
type Request struct {
127152
// Req is the message request to send.
128-
Req wire.Message
153+
Req *ReqMessage
129154

130155
// HandleResp is a response handler that will be called for every
131156
// message received from the peer that the request was made to. It
@@ -138,7 +163,22 @@ type Request struct {
138163
// should validate the response and immediately return the progress.
139164
// The response should be handed off to another goroutine for
140165
// processing.
141-
HandleResp func(req, resp wire.Message, peer string) Progress
166+
HandleResp func(req, resp wire.Message, peer Peer, jobErr *error) Progress
167+
168+
//SendQuery handles sending request to the worker's peer. It returns an error,
169+
//if one is encountered while sending the request.
170+
SendQuery func(worker Worker, job Task) error
171+
172+
//CloneReq clones the message.
173+
CloneReq func(message ReqMessage) *ReqMessage
174+
}
175+
176+
// ReqMessage is a struct that contains the wire.Message sent to the peers as well as a
177+
// priority index, in case the caller wants to manipulate how quickly the message is sent
178+
// by the workmanager to the worker.
179+
type ReqMessage struct {
180+
wire.Message
181+
PriorityIndex float64
142182
}
143183

144184
// WorkManager defines an API for a manager that dispatches queries to bitcoin
@@ -167,11 +207,6 @@ type Dispatcher interface {
167207
// Peer is the interface that defines the methods needed by the query package
168208
// to be able to make requests and receive responses from a network peer.
169209
type Peer interface {
170-
// QueueMessageWithEncoding adds the passed bitcoin message to the peer
171-
// send queue.
172-
QueueMessageWithEncoding(msg wire.Message, doneChan chan<- struct{},
173-
encoding wire.MessageEncoding)
174-
175210
// SubscribeRecvMsg adds a OnRead subscription to the peer. All bitcoin
176211
// messages received from this peer will be sent on the returned
177212
// channel. A closure is also returned, that should be called to cancel
@@ -184,4 +219,17 @@ type Peer interface {
184219
// OnDisconnect returns a channel that will be closed when this peer is
185220
// disconnected.
186221
OnDisconnect() <-chan struct{}
222+
223+
//LastReqDuration returns the last request duration of the peer.
224+
LastReqDuration() time.Duration
225+
226+
//UpdateRequestDuration updates the latest request duration of the peer.
227+
UpdateRequestDuration()
228+
229+
//IsPeerBehindStartHeight returns a boolean indicating if the peer's known last height is behind
230+
//the request's start Height which it receives as an argument.
231+
IsPeerBehindStartHeight(req wire.Message) bool
232+
233+
//IsSyncCandidate returns if the peer is a sync candidate.
234+
IsSyncCandidate() bool
187235
}

0 commit comments

Comments
 (0)