Skip to content

Commit af8a742

Browse files
committed
les: improved header fetcher and server statistics
1 parent e67500a commit af8a742

File tree

10 files changed

+812
-481
lines changed

10 files changed

+812
-481
lines changed

les/fetcher.go

Lines changed: 572 additions & 182 deletions
Large diffs are not rendered by default.

les/handler.go

Lines changed: 22 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,8 @@ import (
2424
"math/big"
2525
"net"
2626
"sync"
27-
"time"
2827

2928
"github.com/ethereum/go-ethereum/common"
30-
"github.com/ethereum/go-ethereum/common/mclock"
3129
"github.com/ethereum/go-ethereum/core"
3230
"github.com/ethereum/go-ethereum/core/state"
3331
"github.com/ethereum/go-ethereum/core/types"
@@ -60,7 +58,7 @@ const (
6058
MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request
6159
MaxTxSend = 64 // Amount of transactions to be send per request
6260

63-
disableClientRemovePeer = true
61+
disableClientRemovePeer = false
6462
)
6563

6664
// errIncompatibleConfig is returned if the requested protocols and configs are
@@ -157,44 +155,27 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
157155
Length: ProtocolLengths[i],
158156
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
159157
var entry *poolEntry
158+
peer := manager.newPeer(int(version), networkId, p, rw)
160159
if manager.serverPool != nil {
161160
addr := p.RemoteAddr().(*net.TCPAddr)
162-
entry = manager.serverPool.connect(p.ID(), addr.IP, uint16(addr.Port))
161+
entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port))
163162
if entry == nil {
164163
return fmt.Errorf("unwanted connection")
165164
}
166165
}
167-
peer := manager.newPeer(int(version), networkId, p, rw)
168166
peer.poolEntry = entry
169167
select {
170168
case manager.newPeerCh <- peer:
171169
manager.wg.Add(1)
172170
defer manager.wg.Done()
173-
start := mclock.Now()
174171
err := manager.handle(peer)
175172
if entry != nil {
176-
connTime := time.Duration(mclock.Now() - start)
177-
stopped := false
178-
select {
179-
case <-manager.quitSync:
180-
stopped = true
181-
default:
182-
}
183-
//fmt.Println("connTime", peer.id, connTime, stopped, err)
184-
quality := float64(1)
185-
setQuality := true
186-
if connTime < time.Minute*10 {
187-
quality = 0
188-
if stopped {
189-
setQuality = false
190-
}
191-
}
192-
manager.serverPool.disconnect(entry, quality, setQuality)
173+
manager.serverPool.disconnect(entry)
193174
}
194175
return err
195176
case <-manager.quitSync:
196177
if entry != nil {
197-
manager.serverPool.disconnect(entry, 0, false)
178+
manager.serverPool.disconnect(entry)
198179
}
199180
return p2p.DiscQuitting
200181
}
@@ -224,7 +205,6 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
224205
manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash,
225206
nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash,
226207
blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
227-
manager.fetcher = newLightFetcher(manager)
228208
}
229209

230210
if odr != nil {
@@ -254,10 +234,12 @@ func (pm *ProtocolManager) removePeer(id string) {
254234
glog.V(logger.Debug).Infof("LES: unregister peer %v", id)
255235
if pm.lightSync {
256236
pm.downloader.UnregisterPeer(id)
257-
pm.odr.UnregisterPeer(peer)
258237
if pm.txrelay != nil {
259238
pm.txrelay.removePeer(id)
260239
}
240+
if pm.fetcher != nil {
241+
pm.fetcher.removePeer(peer)
242+
}
261243
}
262244
if err := pm.peers.Unregister(id); err != nil {
263245
glog.V(logger.Error).Infoln("Removal failed:", err)
@@ -276,8 +258,10 @@ func (pm *ProtocolManager) Start(srvr *p2p.Server) {
276258
lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
277259
if pm.lightSync {
278260
// start sync handler
279-
if srvr != nil {
261+
if srvr != nil { // srvr is nil during testing
280262
pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg)
263+
pm.odr.serverPool = pm.serverPool
264+
pm.fetcher = newLightFetcher(pm)
281265
}
282266
go pm.syncer()
283267
} else {
@@ -369,12 +353,17 @@ func (pm *ProtocolManager) handle(p *peer) error {
369353
requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
370354
return err
371355
}
372-
pm.odr.RegisterPeer(p)
373356
if pm.txrelay != nil {
374357
pm.txrelay.addPeer(p)
375358
}
376359

377-
pm.fetcher.notify(p, nil)
360+
p.lock.Lock()
361+
head := p.headInfo
362+
p.lock.Unlock()
363+
if pm.fetcher != nil {
364+
pm.fetcher.addPeer(p)
365+
pm.fetcher.announce(p, head)
366+
}
378367

379368
if p.poolEntry != nil {
380369
pm.serverPool.registered(p.poolEntry)
@@ -460,7 +449,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
460449
return errResp(ErrDecode, "%v: %v", msg, err)
461450
}
462451
glog.V(logger.Detail).Infoln("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth)
463-
pm.fetcher.notify(p, &req)
452+
if pm.fetcher != nil {
453+
go pm.fetcher.announce(p, &req)
454+
}
464455

465456
case GetBlockHeadersMsg:
466457
glog.V(logger.Debug).Infof("<=== GetBlockHeadersMsg from peer %v", p.id)
@@ -558,7 +549,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
558549
return errResp(ErrDecode, "msg %v: %v", msg, err)
559550
}
560551
p.fcServer.GotReply(resp.ReqID, resp.BV)
561-
if pm.fetcher.requestedID(resp.ReqID) {
552+
if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) {
562553
pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
563554
} else {
564555
err := pm.downloader.DeliverHeaders(p.id, resp.Headers)

les/helper_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"math/big"
2626
"sync"
2727
"testing"
28+
"time"
2829

2930
"github.com/ethereum/go-ethereum/common"
3031
"github.com/ethereum/go-ethereum/core"
@@ -334,3 +335,13 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu
334335
func (p *testPeer) close() {
335336
p.app.Close()
336337
}
338+
339+
type testServerPool peer
340+
341+
func (p *testServerPool) selectPeer(func(*peer) (bool, uint64)) *peer {
342+
return (*peer)(p)
343+
}
344+
345+
func (p *testServerPool) adjustResponseTime(*poolEntry, time.Duration, bool) {
346+
347+
}

les/odr.go

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,26 @@ var (
3737
// peerDropFn is a callback type for dropping a peer detected as malicious.
3838
type peerDropFn func(id string)
3939

40+
type odrPeerSelector interface {
41+
selectPeer(func(*peer) (bool, uint64)) *peer
42+
adjustResponseTime(*poolEntry, time.Duration, bool)
43+
}
44+
4045
type LesOdr struct {
4146
light.OdrBackend
4247
db ethdb.Database
4348
stop chan struct{}
4449
removePeer peerDropFn
4550
mlock, clock sync.Mutex
4651
sentReqs map[uint64]*sentReq
47-
peers *odrPeerSet
52+
serverPool odrPeerSelector
4853
lastReqID uint64
4954
}
5055

5156
func NewLesOdr(db ethdb.Database) *LesOdr {
5257
return &LesOdr{
5358
db: db,
5459
stop: make(chan struct{}),
55-
peers: newOdrPeerSet(),
5660
sentReqs: make(map[uint64]*sentReq),
5761
}
5862
}
@@ -77,16 +81,6 @@ type sentReq struct {
7781
answered chan struct{} // closed and set to nil when any peer answers it
7882
}
7983

80-
// RegisterPeer registers a new LES peer to the ODR capable peer set
81-
func (self *LesOdr) RegisterPeer(p *peer) error {
82-
return self.peers.register(p)
83-
}
84-
85-
// UnregisterPeer removes a peer from the ODR capable peer set
86-
func (self *LesOdr) UnregisterPeer(p *peer) {
87-
self.peers.unregister(p)
88-
}
89-
9084
const (
9185
MsgBlockBodies = iota
9286
MsgCode
@@ -142,29 +136,26 @@ func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout cha
142136

143137
select {
144138
case <-delivered:
145-
servTime := uint64(mclock.Now() - stime)
146-
self.peers.updateTimeout(peer, false)
147-
self.peers.updateServTime(peer, servTime)
139+
if self.serverPool != nil {
140+
self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), false)
141+
}
148142
return
149143
case <-time.After(softRequestTimeout):
150144
close(timeout)
151-
if self.peers.updateTimeout(peer, true) {
152-
self.removePeer(peer.id)
153-
}
154145
case <-self.stop:
155146
return
156147
}
157148

158149
select {
159150
case <-delivered:
160-
servTime := uint64(mclock.Now() - stime)
161-
self.peers.updateServTime(peer, servTime)
162-
return
163151
case <-time.After(hardRequestTimeout):
164-
self.removePeer(peer.id)
152+
go self.removePeer(peer.id)
165153
case <-self.stop:
166154
return
167155
}
156+
if self.serverPool != nil {
157+
self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), true)
158+
}
168159
}
169160

170161
// networkRequest sends a request to known peers until an answer is received
@@ -193,7 +184,13 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro
193184

194185
exclude := make(map[*peer]struct{})
195186
for {
196-
if peer := self.peers.bestPeer(lreq, exclude); peer == nil {
187+
var p *peer
188+
if self.serverPool != nil {
189+
p = self.serverPool.selectPeer(func(p *peer) (bool, uint64) {
190+
return true, p.fcServer.CanSend(lreq.GetCost(p))
191+
})
192+
}
193+
if p == nil {
197194
select {
198195
case <-ctx.Done():
199196
return ctx.Err()
@@ -202,17 +199,17 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro
202199
case <-time.After(retryPeers):
203200
}
204201
} else {
205-
exclude[peer] = struct{}{}
202+
exclude[p] = struct{}{}
206203
delivered := make(chan struct{})
207204
timeout := make(chan struct{})
208205
req.lock.Lock()
209-
req.sentTo[peer] = delivered
206+
req.sentTo[p] = delivered
210207
req.lock.Unlock()
211208
reqWg.Add(1)
212-
cost := lreq.GetCost(peer)
213-
peer.fcServer.SendRequest(reqID, cost)
214-
go self.requestPeer(req, peer, delivered, timeout, reqWg)
215-
lreq.Request(reqID, peer)
209+
cost := lreq.GetCost(p)
210+
p.fcServer.SendRequest(reqID, cost)
211+
go self.requestPeer(req, p, delivered, timeout, reqWg)
212+
lreq.Request(reqID, p)
216213

217214
select {
218215
case <-ctx.Done():

0 commit comments

Comments
 (0)