Skip to content

Commit 2d89fe0

Browse files
les: move client pool to les/vflux/server (#22495)
* les: move client pool to les/vflux/server * les/vflux/server: un-expose NodeBalance, remove unused fn, fix bugs * tests/fuzzers/vflux: add ClientPool fuzzer * les/vflux/server: fixed balance tests * les: rebase fix * les/vflux/server: fixed more bugs * les/vflux/server: unexported NodeStateMachine fields and flags * les/vflux/server: unexport all internal components and functions * les/vflux/server: fixed priorityPool test * les/vflux/server: polish balance * les/vflux/server: fixed mutex locking error * les/vflux/server: priorityPool bug fixed * common/prque: make Prque wrap-around priority handling optional * les/vflux/server: rename funcs, small optimizations * les/vflux/server: fixed timeUntil * les/vflux/server: separated balance.posValue and negValue * les/vflux/server: polish setup * les/vflux/server: enforce capacity curve monotonicity * les/vflux/server: simplified requestCapacity * les/vflux/server: requestCapacity with target range, no iterations in SetCapacity * les/vflux/server: minor changes * les/vflux/server: moved default factors to balanceTracker * les/vflux/server: set inactiveFlag in priorityPool * les/vflux/server: moved related metrics to vfs package * les/vflux/client: make priorityPool temp state logic cleaner * les/vflux/server: changed log.Crit to log.Error * add vflux fuzzer to oss-fuzz Co-authored-by: rjl493456442 <[email protected]>
1 parent e275b1a commit 2d89fe0

27 files changed

+1986
-1544
lines changed

common/prque/lazyqueue.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ type (
5555
// NewLazyQueue creates a new lazy queue
5656
func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue {
5757
q := &LazyQueue{
58-
popQueue: newSstack(nil),
58+
popQueue: newSstack(nil, false),
5959
setIndex: setIndex,
6060
priority: priority,
6161
maxPriority: maxPriority,
@@ -71,8 +71,8 @@ func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPrior
7171

7272
// Reset clears the contents of the queue
7373
func (q *LazyQueue) Reset() {
74-
q.queue[0] = newSstack(q.setIndex0)
75-
q.queue[1] = newSstack(q.setIndex1)
74+
q.queue[0] = newSstack(q.setIndex0, false)
75+
q.queue[1] = newSstack(q.setIndex1, false)
7676
}
7777

7878
// Refresh performs queue re-evaluation if necessary

common/prque/prque.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,12 @@ type Prque struct {
2828

2929
// New creates a new priority queue.
3030
func New(setIndex SetIndexCallback) *Prque {
31-
return &Prque{newSstack(setIndex)}
31+
return &Prque{newSstack(setIndex, false)}
32+
}
33+
34+
// NewWrapAround creates a new priority queue with wrap-around priority handling.
35+
func NewWrapAround(setIndex SetIndexCallback) *Prque {
36+
return &Prque{newSstack(setIndex, true)}
3237
}
3338

3439
// Pushes a value with a given priority into the queue, expanding if necessary.

common/prque/sstack.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,24 @@ type SetIndexCallback func(data interface{}, index int)
3131
// the stack (heap) functionality and the Len, Less and Swap methods for the
3232
// sortability requirements of the heaps.
3333
type sstack struct {
34-
setIndex SetIndexCallback
35-
size int
36-
capacity int
37-
offset int
34+
setIndex SetIndexCallback
35+
size int
36+
capacity int
37+
offset int
38+
wrapAround bool
3839

3940
blocks [][]*item
4041
active []*item
4142
}
4243

4344
// Creates a new, empty stack.
44-
func newSstack(setIndex SetIndexCallback) *sstack {
45+
func newSstack(setIndex SetIndexCallback, wrapAround bool) *sstack {
4546
result := new(sstack)
4647
result.setIndex = setIndex
4748
result.active = make([]*item, blockSize)
4849
result.blocks = [][]*item{result.active}
4950
result.capacity = blockSize
51+
result.wrapAround = wrapAround
5052
return result
5153
}
5254

@@ -94,7 +96,11 @@ func (s *sstack) Len() int {
9496
// Compares the priority of two elements of the stack (higher is first).
9597
// Required by sort.Interface.
9698
func (s *sstack) Less(i, j int) bool {
97-
return (s.blocks[i/blockSize][i%blockSize].priority - s.blocks[j/blockSize][j%blockSize].priority) > 0
99+
a, b := s.blocks[i/blockSize][i%blockSize].priority, s.blocks[j/blockSize][j%blockSize].priority
100+
if s.wrapAround {
101+
return a-b > 0
102+
}
103+
return a > b
98104
}
99105

100106
// Swaps two elements in the stack. Required by sort.Interface.
@@ -110,5 +116,5 @@ func (s *sstack) Swap(i, j int) {
110116

111117
// Resets the stack, effectively clearing its contents.
112118
func (s *sstack) Reset() {
113-
*s = *newSstack(s.setIndex)
119+
*s = *newSstack(s.setIndex, false)
114120
}

common/prque/sstack_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func TestSstack(t *testing.T) {
2121
for i := 0; i < size; i++ {
2222
data[i] = &item{rand.Int(), rand.Int63()}
2323
}
24-
stack := newSstack(nil)
24+
stack := newSstack(nil, false)
2525
for rep := 0; rep < 2; rep++ {
2626
// Push all the data into the stack, pop out every second
2727
secs := []*item{}
@@ -55,7 +55,7 @@ func TestSstackSort(t *testing.T) {
5555
data[i] = &item{rand.Int(), int64(i)}
5656
}
5757
// Push all the data into the stack
58-
stack := newSstack(nil)
58+
stack := newSstack(nil, false)
5959
for _, val := range data {
6060
stack.Push(val)
6161
}
@@ -76,7 +76,7 @@ func TestSstackReset(t *testing.T) {
7676
for i := 0; i < size; i++ {
7777
data[i] = &item{rand.Int(), rand.Int63()}
7878
}
79-
stack := newSstack(nil)
79+
stack := newSstack(nil, false)
8080
for rep := 0; rep < 2; rep++ {
8181
// Push all the data into the stack, pop out every second
8282
secs := []*item{}

les/api.go

Lines changed: 54 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ var (
3131
errNoCheckpoint = errors.New("no local checkpoint provided")
3232
errNotActivated = errors.New("checkpoint registrar is not activated")
3333
errUnknownBenchmarkType = errors.New("unknown benchmark type")
34-
errNoPriority = errors.New("priority too low to raise capacity")
3534
)
3635

3736
// PrivateLightServerAPI provides an API to access the LES light server.
@@ -44,8 +43,8 @@ type PrivateLightServerAPI struct {
4443
func NewPrivateLightServerAPI(server *LesServer) *PrivateLightServerAPI {
4544
return &PrivateLightServerAPI{
4645
server: server,
47-
defaultPosFactors: server.clientPool.defaultPosFactors,
48-
defaultNegFactors: server.clientPool.defaultNegFactors,
46+
defaultPosFactors: defaultPosFactors,
47+
defaultNegFactors: defaultNegFactors,
4948
}
5049
}
5150

@@ -66,7 +65,9 @@ func (api *PrivateLightServerAPI) ServerInfo() map[string]interface{} {
6665
res := make(map[string]interface{})
6766
res["minimumCapacity"] = api.server.minCapacity
6867
res["maximumCapacity"] = api.server.maxCapacity
69-
res["totalCapacity"], res["totalConnectedCapacity"], res["priorityConnectedCapacity"] = api.server.clientPool.capacityInfo()
68+
_, res["totalCapacity"] = api.server.clientPool.Limits()
69+
_, res["totalConnectedCapacity"] = api.server.clientPool.Active()
70+
res["priorityConnectedCapacity"] = 0 //TODO connect when token sale module is added
7071
return res
7172
}
7273

@@ -80,9 +81,18 @@ func (api *PrivateLightServerAPI) ClientInfo(nodes []string) map[enode.ID]map[st
8081
}
8182

8283
res := make(map[enode.ID]map[string]interface{})
83-
api.server.clientPool.forClients(ids, func(client *clientInfo) {
84-
res[client.node.ID()] = api.clientInfo(client)
85-
})
84+
if len(ids) == 0 {
85+
ids = api.server.peers.ids()
86+
}
87+
for _, id := range ids {
88+
if peer := api.server.peers.peer(id); peer != nil {
89+
res[id] = api.clientInfo(peer, peer.balance)
90+
} else {
91+
api.server.clientPool.BalanceOperation(id, "", func(balance vfs.AtomicBalanceOperator) {
92+
res[id] = api.clientInfo(nil, balance)
93+
})
94+
}
95+
}
8696
return res
8797
}
8898

@@ -94,39 +104,43 @@ func (api *PrivateLightServerAPI) ClientInfo(nodes []string) map[enode.ID]map[st
94104
// assigned to it.
95105
func (api *PrivateLightServerAPI) PriorityClientInfo(start, stop enode.ID, maxCount int) map[enode.ID]map[string]interface{} {
96106
res := make(map[enode.ID]map[string]interface{})
97-
ids := api.server.clientPool.bt.GetPosBalanceIDs(start, stop, maxCount+1)
107+
ids := api.server.clientPool.GetPosBalanceIDs(start, stop, maxCount+1)
98108
if len(ids) > maxCount {
99109
res[ids[maxCount]] = make(map[string]interface{})
100110
ids = ids[:maxCount]
101111
}
102-
if len(ids) != 0 {
103-
api.server.clientPool.forClients(ids, func(client *clientInfo) {
104-
res[client.node.ID()] = api.clientInfo(client)
105-
})
112+
for _, id := range ids {
113+
if peer := api.server.peers.peer(id); peer != nil {
114+
res[id] = api.clientInfo(peer, peer.balance)
115+
} else {
116+
api.server.clientPool.BalanceOperation(id, "", func(balance vfs.AtomicBalanceOperator) {
117+
res[id] = api.clientInfo(nil, balance)
118+
})
119+
}
106120
}
107121
return res
108122
}
109123

110124
// clientInfo creates a client info data structure
111-
func (api *PrivateLightServerAPI) clientInfo(c *clientInfo) map[string]interface{} {
125+
func (api *PrivateLightServerAPI) clientInfo(peer *clientPeer, balance vfs.ReadOnlyBalance) map[string]interface{} {
112126
info := make(map[string]interface{})
113-
pb, nb := c.balance.GetBalance()
114-
info["isConnected"] = c.connected
127+
pb, nb := balance.GetBalance()
128+
info["isConnected"] = peer != nil
115129
info["pricing/balance"] = pb
116130
info["priority"] = pb != 0
117131
// cb := api.server.clientPool.ndb.getCurrencyBalance(id)
118132
// info["pricing/currency"] = cb.amount
119-
if c.connected {
120-
info["connectionTime"] = float64(mclock.Now()-c.connectedAt) / float64(time.Second)
121-
info["capacity"], _ = api.server.clientPool.ns.GetField(c.node, priorityPoolSetup.CapacityField).(uint64)
133+
if peer != nil {
134+
info["connectionTime"] = float64(mclock.Now()-peer.connectedAt) / float64(time.Second)
135+
info["capacity"] = peer.getCapacity()
122136
info["pricing/negBalance"] = nb
123137
}
124138
return info
125139
}
126140

127141
// setParams either sets the given parameters for a single connected client (if specified)
128142
// or the default parameters applicable to clients connected in the future
129-
func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, client *clientInfo, posFactors, negFactors *vfs.PriceFactors) (updateFactors bool, err error) {
143+
func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, client *clientPeer, posFactors, negFactors *vfs.PriceFactors) (updateFactors bool, err error) {
130144
defParams := client == nil
131145
for name, value := range params {
132146
errValue := func() error {
@@ -156,9 +170,8 @@ func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, clien
156170
setFactor(&negFactors.RequestFactor)
157171
case !defParams && name == "capacity":
158172
if capacity, ok := value.(float64); ok && uint64(capacity) >= api.server.minCapacity {
159-
_, err = api.server.clientPool.setCapacity(client.node, client.address, uint64(capacity), 0, true)
160-
// Don't have to call factor update explicitly. It's already done
161-
// in setCapacity function.
173+
_, err = api.server.clientPool.SetCapacity(client.Node(), uint64(capacity), 0, false)
174+
// time factor recalculation is performed automatically by the balance tracker
162175
} else {
163176
err = errValue()
164177
}
@@ -179,39 +192,33 @@ func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, clien
179192
// SetClientParams sets client parameters for all clients listed in the ids list
180193
// or all connected clients if the list is empty
181194
func (api *PrivateLightServerAPI) SetClientParams(nodes []string, params map[string]interface{}) error {
182-
var (
183-
ids []enode.ID
184-
err error
185-
)
195+
var err error
186196
for _, node := range nodes {
187-
if id, err := parseNode(node); err != nil {
197+
var id enode.ID
198+
if id, err = parseNode(node); err != nil {
188199
return err
189-
} else {
190-
ids = append(ids, id)
191200
}
192-
}
193-
api.server.clientPool.forClients(ids, func(client *clientInfo) {
194-
if client.connected {
195-
posFactors, negFactors := client.balance.GetPriceFactors()
196-
update, e := api.setParams(params, client, &posFactors, &negFactors)
201+
if peer := api.server.peers.peer(id); peer != nil {
202+
posFactors, negFactors := peer.balance.GetPriceFactors()
203+
update, e := api.setParams(params, peer, &posFactors, &negFactors)
197204
if update {
198-
client.balance.SetPriceFactors(posFactors, negFactors)
205+
peer.balance.SetPriceFactors(posFactors, negFactors)
199206
}
200207
if e != nil {
201208
err = e
202209
}
203210
} else {
204-
err = fmt.Errorf("client %064x is not connected", client.node.ID())
211+
err = fmt.Errorf("client %064x is not connected", id)
205212
}
206-
})
213+
}
207214
return err
208215
}
209216

210217
// SetDefaultParams sets the default parameters applicable to clients connected in the future
211218
func (api *PrivateLightServerAPI) SetDefaultParams(params map[string]interface{}) error {
212219
update, err := api.setParams(params, nil, &api.defaultPosFactors, &api.defaultNegFactors)
213220
if update {
214-
api.server.clientPool.setDefaultFactors(api.defaultPosFactors, api.defaultNegFactors)
221+
api.server.clientPool.SetDefaultFactors(api.defaultPosFactors, api.defaultNegFactors)
215222
}
216223
return err
217224
}
@@ -224,7 +231,7 @@ func (api *PrivateLightServerAPI) SetConnectedBias(bias time.Duration) error {
224231
if bias < time.Duration(0) {
225232
return fmt.Errorf("bias illegal: %v less than 0", bias)
226233
}
227-
api.server.clientPool.setConnectedBias(bias)
234+
api.server.clientPool.SetConnectedBias(bias)
228235
return nil
229236
}
230237

@@ -235,8 +242,8 @@ func (api *PrivateLightServerAPI) AddBalance(node string, amount int64) (balance
235242
if id, err = parseNode(node); err != nil {
236243
return
237244
}
238-
api.server.clientPool.forClients([]enode.ID{id}, func(c *clientInfo) {
239-
balance[0], balance[1], err = c.balance.AddBalance(amount)
245+
api.server.clientPool.BalanceOperation(id, "", func(nb vfs.AtomicBalanceOperator) {
246+
balance[0], balance[1], err = nb.AddBalance(amount)
240247
})
241248
return
242249
}
@@ -338,14 +345,12 @@ func (api *PrivateDebugAPI) FreezeClient(node string) error {
338345
if id, err = parseNode(node); err != nil {
339346
return err
340347
}
341-
api.server.clientPool.forClients([]enode.ID{id}, func(c *clientInfo) {
342-
if c.connected {
343-
c.peer.freeze()
344-
} else {
345-
err = fmt.Errorf("client %064x is not connected", id[:])
346-
}
347-
})
348-
return err
348+
if peer := api.server.peers.peer(id); peer != nil {
349+
peer.freeze()
350+
return nil
351+
} else {
352+
return fmt.Errorf("client %064x is not connected", id[:])
353+
}
349354
}
350355

351356
// PrivateLightAPI provides an API to access the LES light server or light client.

0 commit comments

Comments
 (0)