Skip to content

Commit 9c45b44

Browse files
authored
Merge pull request #3607 from zsfelfoldi/light-fix2
les: fix private net issues, enable adding peers manually again
2 parents 690f6ea + a390ca5 commit 9c45b44

File tree

7 files changed

+65
-80
lines changed

7 files changed

+65
-80
lines changed

cmd/utils/flags.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,10 @@ func MakeNode(ctx *cli.Context, name, gitCommit string) *node.Node {
654654
vsn += "-" + gitCommit[:8]
655655
}
656656

657+
// if we're running a light client or server, force enable the v5 peer discovery unless it is explicitly disabled with --nodiscover
658+
// note that explicitly specifying --v5disc overrides --nodiscover, in which case the later only disables v4 discovery
659+
forceV5Discovery := (ctx.GlobalBool(LightModeFlag.Name) || ctx.GlobalInt(LightServFlag.Name) > 0) && !ctx.GlobalBool(NoDiscoverFlag.Name)
660+
657661
config := &node.Config{
658662
DataDir: MakeDataDir(ctx),
659663
KeyStoreDir: ctx.GlobalString(KeyStoreDirFlag.Name),
@@ -662,8 +666,8 @@ func MakeNode(ctx *cli.Context, name, gitCommit string) *node.Node {
662666
Name: name,
663667
Version: vsn,
664668
UserIdent: makeNodeUserIdent(ctx),
665-
NoDiscovery: ctx.GlobalBool(NoDiscoverFlag.Name) || ctx.GlobalBool(LightModeFlag.Name),
666-
DiscoveryV5: ctx.GlobalBool(DiscoveryV5Flag.Name) || ctx.GlobalBool(LightModeFlag.Name) || ctx.GlobalInt(LightServFlag.Name) > 0,
669+
NoDiscovery: ctx.GlobalBool(NoDiscoverFlag.Name) || ctx.GlobalBool(LightModeFlag.Name), // always disable v4 discovery in light client mode
670+
DiscoveryV5: ctx.GlobalBool(DiscoveryV5Flag.Name) || forceV5Discovery,
667671
DiscoveryV5Addr: MakeDiscoveryV5Address(ctx),
668672
BootstrapNodes: MakeBootstrapNodes(ctx),
669673
BootstrapNodesV5: MakeBootstrapNodesV5(ctx),

eth/backend.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ type Config struct {
105105

106106
type LesServer interface {
107107
Start(srvr *p2p.Server)
108-
Synced()
109108
Stop()
110109
Protocols() []p2p.Protocol
111110
}

eth/handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func NewProtocolManager(config *params.ChainConfig, fastSync bool, networkId int
173173
return blockchain.CurrentBlock().NumberU64()
174174
}
175175
inserter := func(blocks types.Blocks) (int, error) {
176-
manager.setSynced() // Mark initial sync done on any fetcher import
176+
atomic.StoreUint32(&manager.synced, 1) // Mark initial sync done on any fetcher import
177177
return manager.insertChain(blocks)
178178
}
179179
manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)

eth/sync.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
181181
if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
182182
return
183183
}
184-
pm.setSynced() // Mark initial sync done
184+
atomic.StoreUint32(&pm.synced, 1) // Mark initial sync done
185185

186186
// If fast sync was enabled, and we synced up, disable it
187187
if atomic.LoadUint32(&pm.fastSync) == 1 {
@@ -192,10 +192,3 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
192192
}
193193
}
194194
}
195-
196-
// setSynced sets the synced flag and notifies the light server if present
197-
func (pm *ProtocolManager) setSynced() {
198-
if atomic.SwapUint32(&pm.synced, 1) == 0 && pm.lesServer != nil {
199-
pm.lesServer.Synced()
200-
}
201-
}

les/handler.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,6 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
160160
if manager.serverPool != nil {
161161
addr := p.RemoteAddr().(*net.TCPAddr)
162162
entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port))
163-
if entry == nil {
164-
return fmt.Errorf("unwanted connection")
165-
}
166163
}
167164
peer.poolEntry = entry
168165
select {

les/server.go

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,7 @@ type LesServer struct {
4242
fcManager *flowcontrol.ClientManager // nil if our node is client only
4343
fcCostStats *requestCostStats
4444
defParams *flowcontrol.ServerParams
45-
srvr *p2p.Server
46-
synced, stopped bool
47-
lock sync.Mutex
45+
stopped bool
4846
}
4947

5048
func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
@@ -70,35 +68,13 @@ func (s *LesServer) Protocols() []p2p.Protocol {
7068
return s.protocolManager.SubProtocols
7169
}
7270

73-
// Start only starts the actual service if the ETH protocol has already been synced,
74-
// otherwise it will be started by Synced()
71+
// Start starts the LES server
7572
func (s *LesServer) Start(srvr *p2p.Server) {
76-
s.lock.Lock()
77-
defer s.lock.Unlock()
78-
79-
s.srvr = srvr
80-
if s.synced {
81-
s.protocolManager.Start(s.srvr)
82-
}
83-
}
84-
85-
// Synced notifies the server that the ETH protocol has been synced and LES service can be started
86-
func (s *LesServer) Synced() {
87-
s.lock.Lock()
88-
defer s.lock.Unlock()
89-
90-
s.synced = true
91-
if s.srvr != nil && !s.stopped {
92-
s.protocolManager.Start(s.srvr)
93-
}
73+
s.protocolManager.Start(srvr)
9474
}
9575

9676
// Stop stops the LES service
9777
func (s *LesServer) Stop() {
98-
s.lock.Lock()
99-
defer s.lock.Unlock()
100-
101-
s.stopped = true
10278
s.fcCostStats.store()
10379
s.fcManager.Stop()
10480
go func() {

les/serverpool.go

Lines changed: 54 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,10 @@ func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry {
160160
defer pool.lock.Unlock()
161161
entry := pool.entries[p.ID()]
162162
if entry == nil {
163-
return nil
163+
entry = pool.findOrNewNode(p.ID(), ip, port)
164164
}
165165
glog.V(logger.Debug).Infof("connecting to %v, state: %v", p.id, entry.state)
166-
if entry.state != psDialed {
166+
if entry.state == psConnected || entry.state == psRegistered {
167167
return nil
168168
}
169169
pool.connWg.Add(1)
@@ -250,11 +250,17 @@ type poolStatAdjust struct {
250250

251251
// adjustBlockDelay adjusts the block announce delay statistics of a node
252252
func (pool *serverPool) adjustBlockDelay(entry *poolEntry, time time.Duration) {
253+
if entry == nil {
254+
return
255+
}
253256
pool.adjustStats <- poolStatAdjust{pseBlockDelay, entry, time}
254257
}
255258

256259
// adjustResponseTime adjusts the request response time statistics of a node
257260
func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, timeout bool) {
261+
if entry == nil {
262+
return
263+
}
258264
if timeout {
259265
pool.adjustStats <- poolStatAdjust{pseResponseTimeout, entry, time}
260266
} else {
@@ -342,7 +348,9 @@ func (pool *serverPool) selectPeerWait(reqID uint64, canSend func(*peer) (bool,
342348
func (pool *serverPool) eventLoop() {
343349
lookupCnt := 0
344350
var convTime mclock.AbsTime
345-
pool.discSetPeriod <- time.Millisecond * 100
351+
if pool.discSetPeriod != nil {
352+
pool.discSetPeriod <- time.Millisecond * 100
353+
}
346354
for {
347355
select {
348356
case entry := <-pool.timeout:
@@ -375,39 +383,7 @@ func (pool *serverPool) eventLoop() {
375383

376384
case node := <-pool.discNodes:
377385
pool.lock.Lock()
378-
now := mclock.Now()
379-
id := discover.NodeID(node.ID)
380-
entry := pool.entries[id]
381-
if entry == nil {
382-
glog.V(logger.Debug).Infof("discovered %v", node.String())
383-
entry = &poolEntry{
384-
id: id,
385-
addr: make(map[string]*poolEntryAddress),
386-
addrSelect: *newWeightedRandomSelect(),
387-
shortRetry: shortRetryCnt,
388-
}
389-
pool.entries[id] = entry
390-
// initialize previously unknown peers with good statistics to give a chance to prove themselves
391-
entry.connectStats.add(1, initStatsWeight)
392-
entry.delayStats.add(0, initStatsWeight)
393-
entry.responseStats.add(0, initStatsWeight)
394-
entry.timeoutStats.add(0, initStatsWeight)
395-
}
396-
entry.lastDiscovered = now
397-
addr := &poolEntryAddress{
398-
ip: node.IP,
399-
port: node.TCP,
400-
}
401-
if a, ok := entry.addr[addr.strKey()]; ok {
402-
addr = a
403-
} else {
404-
entry.addr[addr.strKey()] = addr
405-
}
406-
addr.lastSeen = now
407-
entry.addrSelect.update(addr)
408-
if !entry.known {
409-
pool.newQueue.setLatest(entry)
410-
}
386+
entry := pool.findOrNewNode(discover.NodeID(node.ID), node.IP, node.TCP)
411387
pool.updateCheckDial(entry)
412388
pool.lock.Unlock()
413389

@@ -419,12 +395,16 @@ func (pool *serverPool) eventLoop() {
419395
lookupCnt++
420396
if pool.fastDiscover && (lookupCnt == 50 || time.Duration(mclock.Now()-convTime) > time.Minute) {
421397
pool.fastDiscover = false
422-
pool.discSetPeriod <- time.Minute
398+
if pool.discSetPeriod != nil {
399+
pool.discSetPeriod <- time.Minute
400+
}
423401
}
424402
}
425403

426404
case <-pool.quit:
427-
close(pool.discSetPeriod)
405+
if pool.discSetPeriod != nil {
406+
close(pool.discSetPeriod)
407+
}
428408
pool.connWg.Wait()
429409
pool.saveNodes()
430410
pool.wg.Done()
@@ -434,6 +414,42 @@ func (pool *serverPool) eventLoop() {
434414
}
435415
}
436416

417+
func (pool *serverPool) findOrNewNode(id discover.NodeID, ip net.IP, port uint16) *poolEntry {
418+
now := mclock.Now()
419+
entry := pool.entries[id]
420+
if entry == nil {
421+
glog.V(logger.Debug).Infof("discovered %v", id.String())
422+
entry = &poolEntry{
423+
id: id,
424+
addr: make(map[string]*poolEntryAddress),
425+
addrSelect: *newWeightedRandomSelect(),
426+
shortRetry: shortRetryCnt,
427+
}
428+
pool.entries[id] = entry
429+
// initialize previously unknown peers with good statistics to give a chance to prove themselves
430+
entry.connectStats.add(1, initStatsWeight)
431+
entry.delayStats.add(0, initStatsWeight)
432+
entry.responseStats.add(0, initStatsWeight)
433+
entry.timeoutStats.add(0, initStatsWeight)
434+
}
435+
entry.lastDiscovered = now
436+
addr := &poolEntryAddress{
437+
ip: ip,
438+
port: port,
439+
}
440+
if a, ok := entry.addr[addr.strKey()]; ok {
441+
addr = a
442+
} else {
443+
entry.addr[addr.strKey()] = addr
444+
}
445+
addr.lastSeen = now
446+
entry.addrSelect.update(addr)
447+
if !entry.known {
448+
pool.newQueue.setLatest(entry)
449+
}
450+
return entry
451+
}
452+
437453
// loadNodes loads known nodes and their statistics from the database
438454
func (pool *serverPool) loadNodes() {
439455
enc, err := pool.db.Get(pool.dbKey)

0 commit comments

Comments
 (0)