Skip to content

Commit 8be8ba4

Browse files
authored
les/vflux: fixed panic and data races (#23865)
* les/vflux/server: fix BalanceOperation * les/vflux/client: fixed data races
1 parent 476fb56 commit 8be8ba4

File tree

3 files changed

+23
-12
lines changed

3 files changed

+23
-12
lines changed

les/vflux/client/serverpool_test.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package client
1919
import (
2020
"math/rand"
2121
"strconv"
22+
"sync"
2223
"sync/atomic"
2324
"testing"
2425
"time"
@@ -52,7 +53,7 @@ func testNodeIndex(id enode.ID) int {
5253
type ServerPoolTest struct {
5354
db ethdb.KeyValueStore
5455
clock *mclock.Simulated
55-
quit chan struct{}
56+
quit chan chan struct{}
5657
preNeg, preNegFail bool
5758
vt *ValueTracker
5859
sp *ServerPool
@@ -62,6 +63,8 @@ type ServerPoolTest struct {
6263
trusted []string
6364
waitCount, waitEnded int32
6465

66+
lock sync.Mutex
67+
6568
cycle, conn, servedConn int
6669
serviceCycles, dialCount int
6770
disconnect map[int][]int
@@ -112,7 +115,9 @@ func (s *ServerPoolTest) start() {
112115
testQuery = func(node *enode.Node) int {
113116
idx := testNodeIndex(node.ID())
114117
n := &s.testNodes[idx]
118+
s.lock.Lock()
115119
canConnect := !n.connected && n.connectCycles != 0 && s.cycle >= n.nextConnCycle
120+
s.lock.Unlock()
116121
if s.preNegFail {
117122
// simulate a scenario where UDP queries never work
118123
s.beginWait()
@@ -155,7 +160,7 @@ func (s *ServerPoolTest) start() {
155160
s.sp.unixTime = func() int64 { return int64(s.clock.Now()) / int64(time.Second) }
156161
s.disconnect = make(map[int][]int)
157162
s.sp.Start()
158-
s.quit = make(chan struct{})
163+
s.quit = make(chan chan struct{})
159164
go func() {
160165
last := int32(-1)
161166
for {
@@ -167,15 +172,18 @@ func (s *ServerPoolTest) start() {
167172
s.clock.Run(time.Second)
168173
}
169174
last = c
170-
case <-s.quit:
175+
case quit := <-s.quit:
176+
close(quit)
171177
return
172178
}
173179
}
174180
}()
175181
}
176182

177183
func (s *ServerPoolTest) stop() {
178-
close(s.quit)
184+
quit := make(chan struct{})
185+
s.quit <- quit
186+
<-quit
179187
s.sp.Stop()
180188
s.spi.Close()
181189
for i := range s.testNodes {
@@ -234,7 +242,9 @@ func (s *ServerPoolTest) run() {
234242
}
235243
s.serviceCycles += s.servedConn
236244
s.clock.Run(time.Second)
245+
s.lock.Lock()
237246
s.cycle++
247+
s.lock.Unlock()
238248
}
239249
}
240250

les/vflux/client/valuetracker.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,22 +50,22 @@ type NodeValueTracker struct {
5050
lastTransfer mclock.AbsTime
5151
basket serverBasket
5252
reqCosts []uint64
53-
reqValues *[]float64
53+
reqValues []float64
5454
}
5555

5656
// UpdateCosts updates the node value tracker's request cost table
5757
func (nv *NodeValueTracker) UpdateCosts(reqCosts []uint64) {
5858
nv.vt.lock.Lock()
5959
defer nv.vt.lock.Unlock()
6060

61-
nv.updateCosts(reqCosts, &nv.vt.refBasket.reqValues, nv.vt.refBasket.reqValueFactor(reqCosts))
61+
nv.updateCosts(reqCosts, nv.vt.refBasket.reqValues, nv.vt.refBasket.reqValueFactor(reqCosts))
6262
}
6363

6464
// updateCosts updates the request cost table of the server. The request value factor
6565
// is also updated based on the given cost table and the current reference basket.
6666
// Note that the contents of the referenced reqValues slice will not change; a new
6767
// reference is passed if the values are updated by ValueTracker.
68-
func (nv *NodeValueTracker) updateCosts(reqCosts []uint64, reqValues *[]float64, rvFactor float64) {
68+
func (nv *NodeValueTracker) updateCosts(reqCosts []uint64, reqValues []float64, rvFactor float64) {
6969
nv.lock.Lock()
7070
defer nv.lock.Unlock()
7171

@@ -112,7 +112,7 @@ func (nv *NodeValueTracker) Served(reqs []ServedRequest, respTime time.Duration)
112112
var value float64
113113
for _, r := range reqs {
114114
nv.basket.add(r.ReqType, r.Amount, nv.reqCosts[r.ReqType]*uint64(r.Amount), expFactor)
115-
value += (*nv.reqValues)[r.ReqType] * float64(r.Amount)
115+
value += nv.reqValues[r.ReqType] * float64(r.Amount)
116116
}
117117
nv.rtStats.Add(respTime, value, expFactor)
118118
}
@@ -356,7 +356,7 @@ func (vt *ValueTracker) Register(id enode.ID) *NodeValueTracker {
356356
reqTypeCount := len(vt.refBasket.reqValues)
357357
nv.reqCosts = make([]uint64, reqTypeCount)
358358
nv.lastTransfer = vt.clock.Now()
359-
nv.reqValues = &vt.refBasket.reqValues
359+
nv.reqValues = vt.refBasket.reqValues
360360
nv.basket.init(reqTypeCount)
361361

362362
vt.connected[id] = nv
@@ -476,7 +476,7 @@ func (vt *ValueTracker) periodicUpdate() {
476476
vt.refBasket.normalize()
477477
vt.refBasket.updateReqValues()
478478
for _, nv := range vt.connected {
479-
nv.updateCosts(nv.reqCosts, &vt.refBasket.reqValues, vt.refBasket.reqValueFactor(nv.reqCosts))
479+
nv.updateCosts(nv.reqCosts, vt.refBasket.reqValues, vt.refBasket.reqValueFactor(nv.reqCosts))
480480
}
481481
vt.saveToDb()
482482
}

les/vflux/server/balance_tracker.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,9 @@ func (bt *balanceTracker) BalanceOperation(id enode.ID, connAddress string, cb f
223223
var nb *nodeBalance
224224
if node := bt.ns.GetNode(id); node != nil {
225225
nb, _ = bt.ns.GetField(node, bt.setup.balanceField).(*nodeBalance)
226-
} else {
227-
node = enode.SignNull(&enr.Record{}, id)
226+
}
227+
if nb == nil {
228+
node := enode.SignNull(&enr.Record{}, id)
228229
nb = bt.newNodeBalance(node, connAddress, false)
229230
}
230231
cb(nb)

0 commit comments

Comments
 (0)