Skip to content

Commit 3e617f3

Browse files
committed
les: implement light server pool
1 parent 0fe35b9 commit 3e617f3

File tree

5 files changed

+947
-45
lines changed

5 files changed

+947
-45
lines changed

les/handler.go

Lines changed: 51 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ import (
2222
"errors"
2323
"fmt"
2424
"math/big"
25+
"net"
2526
"sync"
2627
"time"
2728

2829
"github.com/ethereum/go-ethereum/common"
30+
"github.com/ethereum/go-ethereum/common/mclock"
2931
"github.com/ethereum/go-ethereum/core"
3032
"github.com/ethereum/go-ethereum/core/state"
3133
"github.com/ethereum/go-ethereum/core/types"
@@ -101,10 +103,7 @@ type ProtocolManager struct {
101103
chainDb ethdb.Database
102104
odr *LesOdr
103105
server *LesServer
104-
105-
topicDisc *discv5.Network
106-
lesTopic discv5.Topic
107-
p2pServer *p2p.Server
106+
serverPool *serverPool
108107

109108
downloader *downloader.Downloader
110109
fetcher *lightFetcher
@@ -157,13 +156,46 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
157156
Version: version,
158157
Length: ProtocolLengths[i],
159158
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
159+
var entry *poolEntry
160+
if manager.serverPool != nil {
161+
addr := p.RemoteAddr().(*net.TCPAddr)
162+
entry = manager.serverPool.connect(p.ID(), addr.IP, uint16(addr.Port))
163+
if entry == nil {
164+
return fmt.Errorf("unwanted connection")
165+
}
166+
}
160167
peer := manager.newPeer(int(version), networkId, p, rw)
168+
peer.poolEntry = entry
161169
select {
162170
case manager.newPeerCh <- peer:
163171
manager.wg.Add(1)
164172
defer manager.wg.Done()
165-
return manager.handle(peer)
173+
start := mclock.Now()
174+
err := manager.handle(peer)
175+
if entry != nil {
176+
connTime := time.Duration(mclock.Now() - start)
177+
stopped := false
178+
select {
179+
case <-manager.quitSync:
180+
stopped = true
181+
default:
182+
}
183+
//fmt.Println("connTime", peer.id, connTime, stopped, err)
184+
quality := float64(1)
185+
setQuality := true
186+
if connTime < time.Minute*10 {
187+
quality = 0
188+
if stopped {
189+
setQuality = false
190+
}
191+
}
192+
manager.serverPool.disconnect(entry, quality, setQuality)
193+
}
194+
return err
166195
case <-manager.quitSync:
196+
if entry != nil {
197+
manager.serverPool.disconnect(entry, 0, false)
198+
}
167199
return p2p.DiscQuitting
168200
}
169201
},
@@ -236,54 +268,24 @@ func (pm *ProtocolManager) removePeer(id string) {
236268
}
237269
}
238270

239-
func (pm *ProtocolManager) findServers() {
240-
if pm.p2pServer == nil || pm.topicDisc == nil {
241-
return
242-
}
243-
glog.V(logger.Debug).Infoln("Looking for topic", string(pm.lesTopic))
244-
enodes := make(chan string, 100)
245-
stop := make(chan struct{})
246-
go pm.topicDisc.SearchTopic(pm.lesTopic, stop, enodes)
247-
go func() {
248-
added := make(map[string]bool)
249-
for {
250-
select {
251-
case enode := <-enodes:
252-
if !added[enode] {
253-
glog.V(logger.Info).Infoln("Found LES server:", enode)
254-
added[enode] = true
255-
if node, err := discover.ParseNode(enode); err == nil {
256-
pm.p2pServer.AddPeer(node)
257-
}
258-
}
259-
case <-stop:
260-
return
261-
}
262-
}
263-
}()
264-
select {
265-
case <-time.After(time.Second * 20):
266-
case <-pm.quitSync:
267-
}
268-
close(stop)
269-
}
270-
271271
func (pm *ProtocolManager) Start(srvr *p2p.Server) {
272-
pm.p2pServer = srvr
272+
var topicDisc *discv5.Network
273273
if srvr != nil {
274-
pm.topicDisc = srvr.DiscV5
274+
topicDisc = srvr.DiscV5
275275
}
276-
pm.lesTopic = discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
276+
lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
277277
if pm.lightSync {
278278
// start sync handler
279-
go pm.findServers()
279+
if srvr != nil {
280+
pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg)
281+
}
280282
go pm.syncer()
281283
} else {
282-
if pm.topicDisc != nil {
284+
if topicDisc != nil {
283285
go func() {
284-
glog.V(logger.Debug).Infoln("Starting registering topic", string(pm.lesTopic))
285-
pm.topicDisc.RegisterTopic(pm.lesTopic, pm.quitSync)
286-
glog.V(logger.Debug).Infoln("Stopped registering topic", string(pm.lesTopic))
286+
glog.V(logger.Debug).Infoln("Starting registering topic", string(lesTopic))
287+
topicDisc.RegisterTopic(lesTopic, pm.quitSync)
288+
glog.V(logger.Debug).Infoln("Stopped registering topic", string(lesTopic))
287289
}()
288290
}
289291
go func() {
@@ -373,6 +375,10 @@ func (pm *ProtocolManager) handle(p *peer) error {
373375
}
374376

375377
pm.fetcher.notify(p, nil)
378+
379+
if p.poolEntry != nil {
380+
pm.serverPool.registered(p.poolEntry)
381+
}
376382
}
377383

378384
stop := make(chan struct{})

les/peer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ type peer struct {
5757

5858
announceChn chan announceData
5959

60+
poolEntry *poolEntry
61+
6062
fcClient *flowcontrol.ClientNode // nil if the peer is server only
6163
fcServer *flowcontrol.ServerNode // nil if the peer is client only
6264
fcServerParams *flowcontrol.ServerParams

les/randselect.go

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
// Copyright 2016 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
// Package les implements the Light Ethereum Subprotocol.
18+
package les
19+
20+
import (
21+
"math/rand"
22+
)
23+
24+
// wrsItem interface should be implemented by any entries that are to be selected from
25+
// a weightedRandomSelect set. Note that recalculating monotonously decreasing item
26+
// weights on-demand (without constantly calling update) is allowed
27+
type wrsItem interface {
28+
Weight() int64
29+
}
30+
31+
// weightedRandomSelect is capable of weighted random selection from a set of items
32+
type weightedRandomSelect struct {
33+
root *wrsNode
34+
idx map[wrsItem]int
35+
}
36+
37+
// newWeightedRandomSelect returns a new weightedRandomSelect structure
38+
func newWeightedRandomSelect() *weightedRandomSelect {
39+
return &weightedRandomSelect{root: &wrsNode{maxItems: wrsBranches}, idx: make(map[wrsItem]int)}
40+
}
41+
42+
// update updates an item's weight, adds it if it was non-existent or removes it if
43+
// the new weight is zero. Note that explicitly updating decreasing weights is not necessary.
44+
func (w *weightedRandomSelect) update(item wrsItem) {
45+
w.setWeight(item, item.Weight())
46+
}
47+
48+
// remove removes an item from the set
49+
func (w *weightedRandomSelect) remove(item wrsItem) {
50+
w.setWeight(item, 0)
51+
}
52+
53+
// setWeight sets an item's weight to a specific value (removes it if zero)
54+
func (w *weightedRandomSelect) setWeight(item wrsItem, weight int64) {
55+
idx, ok := w.idx[item]
56+
if ok {
57+
w.root.setWeight(idx, weight)
58+
if weight == 0 {
59+
delete(w.idx, item)
60+
}
61+
} else {
62+
if weight != 0 {
63+
if w.root.itemCnt == w.root.maxItems {
64+
// add a new level
65+
newRoot := &wrsNode{sumWeight: w.root.sumWeight, itemCnt: w.root.itemCnt, level: w.root.level + 1, maxItems: w.root.maxItems * wrsBranches}
66+
newRoot.items[0] = w.root
67+
newRoot.weights[0] = w.root.sumWeight
68+
w.root = newRoot
69+
}
70+
w.idx[item] = w.root.insert(item, weight)
71+
}
72+
}
73+
}
74+
75+
// choose randomly selects an item from the set, with a chance proportional to its
76+
// current weight. If the weight of the chosen element has been decreased since the
77+
// last stored value, returns it with a newWeight/oldWeight chance, otherwise just
78+
// updates its weight and selects another one
79+
func (w *weightedRandomSelect) choose() wrsItem {
80+
for {
81+
if w.root.sumWeight == 0 {
82+
return nil
83+
}
84+
val := rand.Int63n(w.root.sumWeight)
85+
choice, lastWeight := w.root.choose(val)
86+
weight := choice.Weight()
87+
if weight != lastWeight {
88+
w.setWeight(choice, weight)
89+
}
90+
if weight >= lastWeight || rand.Int63n(lastWeight) < weight {
91+
return choice
92+
}
93+
}
94+
}
95+
96+
const wrsBranches = 8 // max number of branches in the wrsNode tree
97+
98+
// wrsNode is a node of a tree structure that can store wrsItems or further wrsNodes.
99+
type wrsNode struct {
100+
items [wrsBranches]interface{}
101+
weights [wrsBranches]int64
102+
sumWeight int64
103+
level, itemCnt, maxItems int
104+
}
105+
106+
// insert recursively inserts a new item to the tree and returns the item index
107+
func (n *wrsNode) insert(item wrsItem, weight int64) int {
108+
branch := 0
109+
for n.items[branch] != nil && (n.level == 0 || n.items[branch].(*wrsNode).itemCnt == n.items[branch].(*wrsNode).maxItems) {
110+
branch++
111+
if branch == wrsBranches {
112+
panic(nil)
113+
}
114+
}
115+
n.itemCnt++
116+
n.sumWeight += weight
117+
n.weights[branch] += weight
118+
if n.level == 0 {
119+
n.items[branch] = item
120+
return branch
121+
} else {
122+
var subNode *wrsNode
123+
if n.items[branch] == nil {
124+
subNode = &wrsNode{maxItems: n.maxItems / wrsBranches, level: n.level - 1}
125+
n.items[branch] = subNode
126+
} else {
127+
subNode = n.items[branch].(*wrsNode)
128+
}
129+
subIdx := subNode.insert(item, weight)
130+
return subNode.maxItems*branch + subIdx
131+
}
132+
}
133+
134+
// setWeight updates the weight of a certain item (which should exist) and returns
135+
// the change of the last weight value stored in the tree
136+
func (n *wrsNode) setWeight(idx int, weight int64) int64 {
137+
if n.level == 0 {
138+
oldWeight := n.weights[idx]
139+
n.weights[idx] = weight
140+
diff := weight - oldWeight
141+
n.sumWeight += diff
142+
if weight == 0 {
143+
n.items[idx] = nil
144+
n.itemCnt--
145+
}
146+
return diff
147+
}
148+
branchItems := n.maxItems / wrsBranches
149+
branch := idx / branchItems
150+
diff := n.items[branch].(*wrsNode).setWeight(idx-branch*branchItems, weight)
151+
n.weights[branch] += diff
152+
n.sumWeight += diff
153+
if weight == 0 {
154+
n.itemCnt--
155+
}
156+
return diff
157+
}
158+
159+
// choose recursively selects an item from the tree and returns it along with its weight
160+
func (n *wrsNode) choose(val int64) (wrsItem, int64) {
161+
for i, w := range n.weights {
162+
if val < w {
163+
if n.level == 0 {
164+
return n.items[i].(wrsItem), n.weights[i]
165+
} else {
166+
return n.items[i].(*wrsNode).choose(val)
167+
}
168+
} else {
169+
val -= w
170+
}
171+
}
172+
panic(nil)
173+
}

les/randselect_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright 2016 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package les
18+
19+
import (
20+
"math/rand"
21+
"testing"
22+
)
23+
24+
type testWrsItem struct {
25+
idx int
26+
widx *int
27+
}
28+
29+
func (t *testWrsItem) Weight() int64 {
30+
w := *t.widx
31+
if w == -1 || w == t.idx {
32+
return int64(t.idx + 1)
33+
}
34+
return 0
35+
}
36+
37+
func TestWeightedRandomSelect(t *testing.T) {
38+
testFn := func(cnt int) {
39+
s := newWeightedRandomSelect()
40+
w := -1
41+
list := make([]testWrsItem, cnt)
42+
for i, _ := range list {
43+
list[i] = testWrsItem{idx: i, widx: &w}
44+
s.update(&list[i])
45+
}
46+
w = rand.Intn(cnt)
47+
c := s.choose()
48+
if c == nil {
49+
t.Errorf("expected item, got nil")
50+
} else {
51+
if c.(*testWrsItem).idx != w {
52+
t.Errorf("expected another item")
53+
}
54+
}
55+
w = -2
56+
if s.choose() != nil {
57+
t.Errorf("expected nil, got item")
58+
}
59+
}
60+
testFn(1)
61+
testFn(10)
62+
testFn(100)
63+
testFn(1000)
64+
testFn(10000)
65+
testFn(100000)
66+
testFn(1000000)
67+
}

0 commit comments

Comments
 (0)