Skip to content

Commit a7de796

Browse files
authored
les: implement new client pool (#19745)
1 parent 947f5f2 commit a7de796

15 files changed

+1651
-586
lines changed

common/mclock/mclock.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ type Clock interface {
4242
Now() AbsTime
4343
Sleep(time.Duration)
4444
After(time.Duration) <-chan time.Time
45+
AfterFunc(d time.Duration, f func()) Event
46+
}
47+
48+
// Event represents a cancellable event returned by AfterFunc
49+
type Event interface {
50+
Cancel() bool
4551
}
4652

4753
// System implements Clock using the system clock.
@@ -61,3 +67,16 @@ func (System) Sleep(d time.Duration) {
6167
func (System) After(d time.Duration) <-chan time.Time {
6268
return time.After(d)
6369
}
70+
71+
// AfterFunc implements Clock.
72+
func (System) AfterFunc(d time.Duration, f func()) Event {
73+
return (*SystemEvent)(time.AfterFunc(d, f))
74+
}
75+
76+
// SystemEvent implements Event using time.Timer.
77+
type SystemEvent time.Timer
78+
79+
// Cancel implements Event.
80+
func (e *SystemEvent) Cancel() bool {
81+
return (*time.Timer)(e).Stop()
82+
}

common/mclock/simclock.go

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,30 +35,44 @@ type Simulated struct {
3535
scheduled []event
3636
mu sync.RWMutex
3737
cond *sync.Cond
38+
lastId uint64
3839
}
3940

4041
type event struct {
4142
do func()
4243
at AbsTime
44+
id uint64
45+
}
46+
47+
// SimulatedEvent implements Event for a virtual clock.
48+
type SimulatedEvent struct {
49+
at AbsTime
50+
id uint64
51+
s *Simulated
4352
}
4453

4554
// Run moves the clock by the given duration, executing all timers before that duration.
4655
func (s *Simulated) Run(d time.Duration) {
4756
s.mu.Lock()
48-
defer s.mu.Unlock()
4957
s.init()
5058

5159
end := s.now + AbsTime(d)
60+
var do []func()
5261
for len(s.scheduled) > 0 {
5362
ev := s.scheduled[0]
5463
if ev.at > end {
5564
break
5665
}
5766
s.now = ev.at
58-
ev.do()
67+
do = append(do, ev.do)
5968
s.scheduled = s.scheduled[1:]
6069
}
6170
s.now = end
71+
s.mu.Unlock()
72+
73+
for _, fn := range do {
74+
fn()
75+
}
6276
}
6377

6478
func (s *Simulated) ActiveTimers() int {
@@ -94,36 +108,69 @@ func (s *Simulated) Sleep(d time.Duration) {
94108
// After implements Clock.
95109
func (s *Simulated) After(d time.Duration) <-chan time.Time {
96110
after := make(chan time.Time, 1)
97-
s.insert(d, func() {
111+
s.AfterFunc(d, func() {
98112
after <- (time.Time{}).Add(time.Duration(s.now))
99113
})
100114
return after
101115
}
102116

103-
func (s *Simulated) insert(d time.Duration, do func()) {
117+
// AfterFunc implements Clock.
118+
func (s *Simulated) AfterFunc(d time.Duration, do func()) Event {
104119
s.mu.Lock()
105120
defer s.mu.Unlock()
106121
s.init()
107122

108123
at := s.now + AbsTime(d)
124+
s.lastId++
125+
id := s.lastId
109126
l, h := 0, len(s.scheduled)
110127
ll := h
111128
for l != h {
112129
m := (l + h) / 2
113-
if at < s.scheduled[m].at {
130+
if (at < s.scheduled[m].at) || ((at == s.scheduled[m].at) && (id < s.scheduled[m].id)) {
114131
h = m
115132
} else {
116133
l = m + 1
117134
}
118135
}
119136
s.scheduled = append(s.scheduled, event{})
120137
copy(s.scheduled[l+1:], s.scheduled[l:ll])
121-
s.scheduled[l] = event{do: do, at: at}
138+
e := event{do: do, at: at, id: id}
139+
s.scheduled[l] = e
122140
s.cond.Broadcast()
141+
return &SimulatedEvent{at: at, id: id, s: s}
123142
}
124143

125144
func (s *Simulated) init() {
126145
if s.cond == nil {
127146
s.cond = sync.NewCond(&s.mu)
128147
}
129148
}
149+
150+
// Cancel implements Event.
151+
func (e *SimulatedEvent) Cancel() bool {
152+
s := e.s
153+
s.mu.Lock()
154+
defer s.mu.Unlock()
155+
156+
l, h := 0, len(s.scheduled)
157+
ll := h
158+
for l != h {
159+
m := (l + h) / 2
160+
if e.id == s.scheduled[m].id {
161+
l = m
162+
break
163+
}
164+
if (e.at < s.scheduled[m].at) || ((e.at == s.scheduled[m].at) && (e.id < s.scheduled[m].id)) {
165+
h = m
166+
} else {
167+
l = m + 1
168+
}
169+
}
170+
if l >= ll || s.scheduled[l].id != e.id {
171+
return false
172+
}
173+
copy(s.scheduled[l:ll-1], s.scheduled[l+1:])
174+
s.scheduled = s.scheduled[:ll-1]
175+
return true
176+
}

common/prque/lazyqueue.go

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// Copyright 2019 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package prque
18+
19+
import (
20+
"container/heap"
21+
"time"
22+
23+
"github.com/ethereum/go-ethereum/common/mclock"
24+
)
25+
26+
// LazyQueue is a priority queue data structure where priorities can change over
27+
// time and are only evaluated on demand.
28+
// Two callbacks are required:
29+
// - priority evaluates the actual priority of an item
30+
// - maxPriority gives an upper estimate for the priority in any moment between
31+
// now and the given absolute time
32+
// If the upper estimate is exceeded then Update should be called for that item.
33+
// A global Refresh function should also be called periodically.
34+
type LazyQueue struct {
35+
clock mclock.Clock
36+
// Items are stored in one of two internal queues ordered by estimated max
37+
// priority until the next and the next-after-next refresh. Update and Refresh
38+
// always places items in queue[1].
39+
queue [2]*sstack
40+
popQueue *sstack
41+
period time.Duration
42+
maxUntil mclock.AbsTime
43+
indexOffset int
44+
setIndex SetIndexCallback
45+
priority PriorityCallback
46+
maxPriority MaxPriorityCallback
47+
}
48+
49+
type (
50+
PriorityCallback func(data interface{}, now mclock.AbsTime) int64 // actual priority callback
51+
MaxPriorityCallback func(data interface{}, until mclock.AbsTime) int64 // estimated maximum priority callback
52+
)
53+
54+
// NewLazyQueue creates a new lazy queue
55+
func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue {
56+
q := &LazyQueue{
57+
popQueue: newSstack(nil),
58+
setIndex: setIndex,
59+
priority: priority,
60+
maxPriority: maxPriority,
61+
clock: clock,
62+
period: refreshPeriod}
63+
q.Reset()
64+
q.Refresh()
65+
return q
66+
}
67+
68+
// Reset clears the contents of the queue
69+
func (q *LazyQueue) Reset() {
70+
q.queue[0] = newSstack(q.setIndex0)
71+
q.queue[1] = newSstack(q.setIndex1)
72+
}
73+
74+
// Refresh should be called at least with the frequency specified by the refreshPeriod parameter
75+
func (q *LazyQueue) Refresh() {
76+
q.maxUntil = q.clock.Now() + mclock.AbsTime(q.period)
77+
for q.queue[0].Len() != 0 {
78+
q.Push(heap.Pop(q.queue[0]).(*item).value)
79+
}
80+
q.queue[0], q.queue[1] = q.queue[1], q.queue[0]
81+
q.indexOffset = 1 - q.indexOffset
82+
q.maxUntil += mclock.AbsTime(q.period)
83+
}
84+
85+
// Push adds an item to the queue
86+
func (q *LazyQueue) Push(data interface{}) {
87+
heap.Push(q.queue[1], &item{data, q.maxPriority(data, q.maxUntil)})
88+
}
89+
90+
// Update updates the upper priority estimate for the item with the given queue index
91+
func (q *LazyQueue) Update(index int) {
92+
q.Push(q.Remove(index))
93+
}
94+
95+
// Pop removes and returns the item with the greatest actual priority
96+
func (q *LazyQueue) Pop() (interface{}, int64) {
97+
var (
98+
resData interface{}
99+
resPri int64
100+
)
101+
q.MultiPop(func(data interface{}, priority int64) bool {
102+
resData = data
103+
resPri = priority
104+
return false
105+
})
106+
return resData, resPri
107+
}
108+
109+
// peekIndex returns the index of the internal queue where the item with the
110+
// highest estimated priority is or -1 if both are empty
111+
func (q *LazyQueue) peekIndex() int {
112+
if q.queue[0].Len() != 0 {
113+
if q.queue[1].Len() != 0 && q.queue[1].blocks[0][0].priority > q.queue[0].blocks[0][0].priority {
114+
return 1
115+
}
116+
return 0
117+
}
118+
if q.queue[1].Len() != 0 {
119+
return 1
120+
}
121+
return -1
122+
}
123+
124+
// MultiPop pops multiple items from the queue and is more efficient than calling
125+
// Pop multiple times. Popped items are passed to the callback. MultiPop returns
126+
// when the callback returns false or there are no more items to pop.
127+
func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) bool) {
128+
now := q.clock.Now()
129+
nextIndex := q.peekIndex()
130+
for nextIndex != -1 {
131+
data := heap.Pop(q.queue[nextIndex]).(*item).value
132+
heap.Push(q.popQueue, &item{data, q.priority(data, now)})
133+
nextIndex = q.peekIndex()
134+
for q.popQueue.Len() != 0 && (nextIndex == -1 || q.queue[nextIndex].blocks[0][0].priority < q.popQueue.blocks[0][0].priority) {
135+
i := heap.Pop(q.popQueue).(*item)
136+
if !callback(i.value, i.priority) {
137+
for q.popQueue.Len() != 0 {
138+
q.Push(heap.Pop(q.popQueue).(*item).value)
139+
}
140+
return
141+
}
142+
}
143+
}
144+
}
145+
146+
// PopItem pops the item from the queue only, dropping the associated priority value.
147+
func (q *LazyQueue) PopItem() interface{} {
148+
i, _ := q.Pop()
149+
return i
150+
}
151+
152+
// Remove removes removes the item with the given index.
153+
func (q *LazyQueue) Remove(index int) interface{} {
154+
if index < 0 {
155+
return nil
156+
}
157+
return heap.Remove(q.queue[index&1^q.indexOffset], index>>1).(*item).value
158+
}
159+
160+
// Empty checks whether the priority queue is empty.
161+
func (q *LazyQueue) Empty() bool {
162+
return q.queue[0].Len() == 0 && q.queue[1].Len() == 0
163+
}
164+
165+
// Size returns the number of items in the priority queue.
166+
func (q *LazyQueue) Size() int {
167+
return q.queue[0].Len() + q.queue[1].Len()
168+
}
169+
170+
// setIndex0 translates internal queue item index to the virtual index space of LazyQueue
171+
func (q *LazyQueue) setIndex0(data interface{}, index int) {
172+
if index == -1 {
173+
q.setIndex(data, -1)
174+
} else {
175+
q.setIndex(data, index+index)
176+
}
177+
}
178+
179+
// setIndex1 translates internal queue item index to the virtual index space of LazyQueue
180+
func (q *LazyQueue) setIndex1(data interface{}, index int) {
181+
q.setIndex(data, index+index+1)
182+
}

0 commit comments

Comments
 (0)