Skip to content

Commit 85d81b2

Browse files
authored
les: remove clientPeerSet and serverSet (ethereum#21566)
* les: move NodeStateMachine from clientPool to LesServer * les: new header broadcaster * les: peerCommons.headInfo always contains last announced head * les: remove clientPeerSet and serverSet * les: fixed panic * les: fixed --nodiscover option * les: disconnect all peers at ns.Stop() * les: added comments and fixed signed broadcasts * les: removed unused parameter, fixed tests
1 parent 3e82c9e commit 85d81b2

File tree

10 files changed

+239
-332
lines changed

10 files changed

+239
-332
lines changed

les/client_handler.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,7 @@ func (h *clientHandler) handle(p *serverPeer) error {
102102
p.Log().Debug("Light Ethereum peer connected", "name", p.Name())
103103

104104
// Execute the LES handshake
105-
var (
106-
head = h.backend.blockchain.CurrentHeader()
107-
hash = head.Hash()
108-
number = head.Number.Uint64()
109-
td = h.backend.blockchain.GetTd(hash, number)
110-
)
111-
if err := p.Handshake(td, hash, number, h.backend.blockchain.Genesis().Hash(), nil); err != nil {
105+
if err := p.Handshake(h.backend.blockchain.Genesis().Hash()); err != nil {
112106
p.Log().Debug("Light Ethereum handshake failed", "err", err)
113107
return err
114108
}

les/clientpool.go

Lines changed: 16 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package les
1818

1919
import (
2020
"fmt"
21-
"reflect"
2221
"sync"
2322
"time"
2423

@@ -46,19 +45,6 @@ const (
4645
inactiveTimeout = time.Second * 10
4746
)
4847

49-
var (
50-
clientPoolSetup = &nodestate.Setup{}
51-
clientField = clientPoolSetup.NewField("clientInfo", reflect.TypeOf(&clientInfo{}))
52-
connAddressField = clientPoolSetup.NewField("connAddr", reflect.TypeOf(""))
53-
balanceTrackerSetup = lps.NewBalanceTrackerSetup(clientPoolSetup)
54-
priorityPoolSetup = lps.NewPriorityPoolSetup(clientPoolSetup)
55-
)
56-
57-
func init() {
58-
balanceTrackerSetup.Connect(connAddressField, priorityPoolSetup.CapacityField)
59-
priorityPoolSetup.Connect(balanceTrackerSetup.BalanceField, balanceTrackerSetup.UpdateFlag) // NodeBalance implements nodePriority
60-
}
61-
6248
// clientPool implements a client database that assigns a priority to each client
6349
// based on a positive and negative balance. Positive balance is externally assigned
6450
// to prioritized clients and is decreased with connection time and processed
@@ -119,8 +105,7 @@ type clientInfo struct {
119105
}
120106

121107
// newClientPool creates a new client pool
122-
func newClientPool(lespayDb ethdb.Database, minCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID)) *clientPool {
123-
ns := nodestate.NewNodeStateMachine(nil, nil, clock, clientPoolSetup)
108+
func newClientPool(ns *nodestate.NodeStateMachine, lespayDb ethdb.Database, minCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID)) *clientPool {
124109
pool := &clientPool{
125110
ns: ns,
126111
BalanceTrackerSetup: balanceTrackerSetup,
@@ -147,7 +132,7 @@ func newClientPool(lespayDb ethdb.Database, minCap uint64, connectedBias time.Du
147132
})
148133

149134
ns.SubscribeState(pool.ActiveFlag.Or(pool.PriorityFlag), func(node *enode.Node, oldState, newState nodestate.Flags) {
150-
c, _ := ns.GetField(node, clientField).(*clientInfo)
135+
c, _ := ns.GetField(node, clientInfoField).(*clientInfo)
151136
if c == nil {
152137
return
153138
}
@@ -172,7 +157,7 @@ func newClientPool(lespayDb ethdb.Database, minCap uint64, connectedBias time.Du
172157
if oldState.Equals(pool.ActiveFlag) && newState.Equals(pool.InactiveFlag) {
173158
clientDeactivatedMeter.Mark(1)
174159
log.Debug("Client deactivated", "id", node.ID())
175-
c, _ := ns.GetField(node, clientField).(*clientInfo)
160+
c, _ := ns.GetField(node, clientInfoField).(*clientInfo)
176161
if c == nil || !c.peer.allowInactive() {
177162
pool.removePeer(node.ID())
178163
}
@@ -190,13 +175,11 @@ func newClientPool(lespayDb ethdb.Database, minCap uint64, connectedBias time.Du
190175
newCap, _ := newValue.(uint64)
191176
totalConnected += newCap - oldCap
192177
totalConnectedGauge.Update(int64(totalConnected))
193-
c, _ := ns.GetField(node, clientField).(*clientInfo)
178+
c, _ := ns.GetField(node, clientInfoField).(*clientInfo)
194179
if c != nil {
195180
c.peer.updateCapacity(newCap)
196181
}
197182
})
198-
199-
ns.Start()
200183
return pool
201184
}
202185

@@ -210,7 +193,6 @@ func (f *clientPool) stop() {
210193
f.disconnectNode(node)
211194
})
212195
f.bt.Stop()
213-
f.ns.Stop()
214196
}
215197

216198
// connect should be called after a successful handshake. If the connection was
@@ -225,7 +207,7 @@ func (f *clientPool) connect(peer clientPoolPeer) (uint64, error) {
225207
}
226208
// Dedup connected peers.
227209
node, freeID := peer.Node(), peer.freeClientId()
228-
if f.ns.GetField(node, clientField) != nil {
210+
if f.ns.GetField(node, clientInfoField) != nil {
229211
log.Debug("Client already connected", "address", freeID, "id", node.ID().String())
230212
return 0, fmt.Errorf("Client already connected address=%s id=%s", freeID, node.ID().String())
231213
}
@@ -237,7 +219,7 @@ func (f *clientPool) connect(peer clientPoolPeer) (uint64, error) {
237219
connected: true,
238220
connectedAt: now,
239221
}
240-
f.ns.SetField(node, clientField, c)
222+
f.ns.SetField(node, clientInfoField, c)
241223
f.ns.SetField(node, connAddressField, freeID)
242224
if c.balance, _ = f.ns.GetField(node, f.BalanceField).(*lps.NodeBalance); c.balance == nil {
243225
f.disconnect(peer)
@@ -280,7 +262,7 @@ func (f *clientPool) disconnect(p clientPoolPeer) {
280262
// disconnectNode removes node fields and flags related to connected status
281263
func (f *clientPool) disconnectNode(node *enode.Node) {
282264
f.ns.SetField(node, connAddressField, nil)
283-
f.ns.SetField(node, clientField, nil)
265+
f.ns.SetField(node, clientInfoField, nil)
284266
}
285267

286268
// setDefaultFactors sets the default price factors applied to subsequently connected clients
@@ -299,7 +281,8 @@ func (f *clientPool) capacityInfo() (uint64, uint64, uint64) {
299281
defer f.lock.Unlock()
300282

301283
// total priority active cap will be supported when the token issuer module is added
302-
return f.capLimit, f.pp.ActiveCapacity(), 0
284+
_, activeCap := f.pp.Active()
285+
return f.capLimit, activeCap, 0
303286
}
304287

305288
// setLimits sets the maximum number and total capacity of connected clients,
@@ -314,21 +297,21 @@ func (f *clientPool) setLimits(totalConn int, totalCap uint64) {
314297

315298
// setCapacity sets the assigned capacity of a connected client
316299
func (f *clientPool) setCapacity(node *enode.Node, freeID string, capacity uint64, bias time.Duration, setCap bool) (uint64, error) {
317-
c, _ := f.ns.GetField(node, clientField).(*clientInfo)
300+
c, _ := f.ns.GetField(node, clientInfoField).(*clientInfo)
318301
if c == nil {
319302
if setCap {
320303
return 0, fmt.Errorf("client %064x is not connected", node.ID())
321304
}
322305
c = &clientInfo{node: node}
323-
f.ns.SetField(node, clientField, c)
306+
f.ns.SetField(node, clientInfoField, c)
324307
f.ns.SetField(node, connAddressField, freeID)
325308
if c.balance, _ = f.ns.GetField(node, f.BalanceField).(*lps.NodeBalance); c.balance == nil {
326309
log.Error("BalanceField is missing", "node", node.ID())
327310
return 0, fmt.Errorf("BalanceField of %064x is missing", node.ID())
328311
}
329312
defer func() {
330313
f.ns.SetField(node, connAddressField, nil)
331-
f.ns.SetField(node, clientField, nil)
314+
f.ns.SetField(node, clientInfoField, nil)
332315
}()
333316
}
334317
var (
@@ -370,7 +353,7 @@ func (f *clientPool) forClients(ids []enode.ID, cb func(client *clientInfo)) {
370353

371354
if len(ids) == 0 {
372355
f.ns.ForEach(nodestate.Flags{}, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
373-
c, _ := f.ns.GetField(node, clientField).(*clientInfo)
356+
c, _ := f.ns.GetField(node, clientInfoField).(*clientInfo)
374357
if c != nil {
375358
cb(c)
376359
}
@@ -381,20 +364,20 @@ func (f *clientPool) forClients(ids []enode.ID, cb func(client *clientInfo)) {
381364
if node == nil {
382365
node = enode.SignNull(&enr.Record{}, id)
383366
}
384-
c, _ := f.ns.GetField(node, clientField).(*clientInfo)
367+
c, _ := f.ns.GetField(node, clientInfoField).(*clientInfo)
385368
if c != nil {
386369
cb(c)
387370
} else {
388371
c = &clientInfo{node: node}
389-
f.ns.SetField(node, clientField, c)
372+
f.ns.SetField(node, clientInfoField, c)
390373
f.ns.SetField(node, connAddressField, "")
391374
if c.balance, _ = f.ns.GetField(node, f.BalanceField).(*lps.NodeBalance); c.balance != nil {
392375
cb(c)
393376
} else {
394377
log.Error("BalanceField is missing")
395378
}
396379
f.ns.SetField(node, connAddressField, nil)
397-
f.ns.SetField(node, clientField, nil)
380+
f.ns.SetField(node, clientInfoField, nil)
398381
}
399382
}
400383
}

les/clientpool_test.go

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ type poolTestPeer struct {
6464
inactiveAllowed bool
6565
}
6666

67+
func testStateMachine() *nodestate.NodeStateMachine {
68+
return nodestate.NewNodeStateMachine(nil, nil, mclock.System{}, serverSetup)
69+
70+
}
71+
6772
func newPoolTestPeer(i int, disconnCh chan int) *poolTestPeer {
6873
return &poolTestPeer{
6974
index: i,
@@ -91,7 +96,7 @@ func (i *poolTestPeer) allowInactive() bool {
9196
}
9297

9398
func getBalance(pool *clientPool, p *poolTestPeer) (pos, neg uint64) {
94-
temp := pool.ns.GetField(p.node, clientField) == nil
99+
temp := pool.ns.GetField(p.node, clientInfoField) == nil
95100
if temp {
96101
pool.ns.SetField(p.node, connAddressField, p.freeClientId())
97102
}
@@ -128,8 +133,9 @@ func testClientPool(t *testing.T, activeLimit, clientCount, paidCount int, rando
128133
disconnFn = func(id enode.ID) {
129134
disconnCh <- int(id[0]) + int(id[1])<<8
130135
}
131-
pool = newClientPool(db, 1, 0, &clock, disconnFn)
136+
pool = newClientPool(testStateMachine(), db, 1, 0, &clock, disconnFn)
132137
)
138+
pool.ns.Start()
133139

134140
pool.setLimits(activeLimit, uint64(activeLimit))
135141
pool.setDefaultFactors(lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1})
@@ -233,7 +239,8 @@ func TestConnectPaidClient(t *testing.T) {
233239
clock mclock.Simulated
234240
db = rawdb.NewMemoryDatabase()
235241
)
236-
pool := newClientPool(db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
242+
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
243+
pool.ns.Start()
237244
defer pool.stop()
238245
pool.setLimits(10, uint64(10))
239246
pool.setDefaultFactors(lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1})
@@ -248,7 +255,8 @@ func TestConnectPaidClientToSmallPool(t *testing.T) {
248255
clock mclock.Simulated
249256
db = rawdb.NewMemoryDatabase()
250257
)
251-
pool := newClientPool(db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
258+
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
259+
pool.ns.Start()
252260
defer pool.stop()
253261
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
254262
pool.setDefaultFactors(lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1})
@@ -266,7 +274,8 @@ func TestConnectPaidClientToFullPool(t *testing.T) {
266274
db = rawdb.NewMemoryDatabase()
267275
)
268276
removeFn := func(enode.ID) {} // Noop
269-
pool := newClientPool(db, 1, defaultConnectedBias, &clock, removeFn)
277+
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
278+
pool.ns.Start()
270279
defer pool.stop()
271280
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
272281
pool.setDefaultFactors(lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1})
@@ -295,7 +304,8 @@ func TestPaidClientKickedOut(t *testing.T) {
295304
removeFn := func(id enode.ID) {
296305
kickedCh <- int(id[0])
297306
}
298-
pool := newClientPool(db, 1, defaultConnectedBias, &clock, removeFn)
307+
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
308+
pool.ns.Start()
299309
pool.bt.SetExpirationTCs(0, 0)
300310
defer pool.stop()
301311
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
@@ -325,7 +335,8 @@ func TestConnectFreeClient(t *testing.T) {
325335
clock mclock.Simulated
326336
db = rawdb.NewMemoryDatabase()
327337
)
328-
pool := newClientPool(db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
338+
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
339+
pool.ns.Start()
329340
defer pool.stop()
330341
pool.setLimits(10, uint64(10))
331342
pool.setDefaultFactors(lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1})
@@ -341,7 +352,8 @@ func TestConnectFreeClientToFullPool(t *testing.T) {
341352
db = rawdb.NewMemoryDatabase()
342353
)
343354
removeFn := func(enode.ID) {} // Noop
344-
pool := newClientPool(db, 1, defaultConnectedBias, &clock, removeFn)
355+
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
356+
pool.ns.Start()
345357
defer pool.stop()
346358
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
347359
pool.setDefaultFactors(lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1})
@@ -370,7 +382,8 @@ func TestFreeClientKickedOut(t *testing.T) {
370382
kicked = make(chan int, 100)
371383
)
372384
removeFn := func(id enode.ID) { kicked <- int(id[0]) }
373-
pool := newClientPool(db, 1, defaultConnectedBias, &clock, removeFn)
385+
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
386+
pool.ns.Start()
374387
defer pool.stop()
375388
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
376389
pool.setDefaultFactors(lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1})
@@ -411,7 +424,8 @@ func TestPositiveBalanceCalculation(t *testing.T) {
411424
kicked = make(chan int, 10)
412425
)
413426
removeFn := func(id enode.ID) { kicked <- int(id[0]) } // Noop
414-
pool := newClientPool(db, 1, defaultConnectedBias, &clock, removeFn)
427+
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
428+
pool.ns.Start()
415429
defer pool.stop()
416430
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
417431
pool.setDefaultFactors(lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1})
@@ -434,7 +448,8 @@ func TestDowngradePriorityClient(t *testing.T) {
434448
kicked = make(chan int, 10)
435449
)
436450
removeFn := func(id enode.ID) { kicked <- int(id[0]) } // Noop
437-
pool := newClientPool(db, 1, defaultConnectedBias, &clock, removeFn)
451+
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
452+
pool.ns.Start()
438453
defer pool.stop()
439454
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
440455
pool.setDefaultFactors(lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 1, CapacityFactor: 0, RequestFactor: 1})
@@ -468,7 +483,8 @@ func TestNegativeBalanceCalculation(t *testing.T) {
468483
clock mclock.Simulated
469484
db = rawdb.NewMemoryDatabase()
470485
)
471-
pool := newClientPool(db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
486+
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
487+
pool.ns.Start()
472488
defer pool.stop()
473489
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
474490
pool.setDefaultFactors(lps.PriceFactors{TimeFactor: 1e-3, CapacityFactor: 0, RequestFactor: 1}, lps.PriceFactors{TimeFactor: 1e-3, CapacityFactor: 0, RequestFactor: 1})
@@ -503,7 +519,8 @@ func TestInactiveClient(t *testing.T) {
503519
clock mclock.Simulated
504520
db = rawdb.NewMemoryDatabase()
505521
)
506-
pool := newClientPool(db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
522+
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
523+
pool.ns.Start()
507524
defer pool.stop()
508525
pool.setLimits(2, uint64(2))
509526

les/enr_entry.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func (e lesEntry) ENRKey() string {
3636

3737
// setupDiscovery creates the node discovery source for the eth protocol.
3838
func (eth *LightEthereum) setupDiscovery(cfg *p2p.Config) (enode.Iterator, error) {
39-
if /*cfg.NoDiscovery || */ len(eth.config.DiscoveryURLs) == 0 {
39+
if cfg.NoDiscovery || len(eth.config.DiscoveryURLs) == 0 {
4040
return nil, nil
4141
}
4242
client := dnsdisc.NewClient(dnsdisc.Config{})

les/lespay/server/prioritypool.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,12 +253,12 @@ func (pp *PriorityPool) SetActiveBias(bias time.Duration) {
253253
pp.tryActivate()
254254
}
255255

256-
// ActiveCapacity returns the total capacity of currently active nodes
257-
func (pp *PriorityPool) ActiveCapacity() uint64 {
256+
// Active returns the number and total capacity of currently active nodes
257+
func (pp *PriorityPool) Active() (uint64, uint64) {
258258
pp.lock.Lock()
259259
defer pp.lock.Unlock()
260260

261-
return pp.activeCap
261+
return pp.activeCount, pp.activeCap
262262
}
263263

264264
// inactiveSetIndex callback updates ppNodeInfo item index in inactiveQueue

0 commit comments

Comments
 (0)