Skip to content

Commit 59a3198

Browse files
rjl493456442karalabe
authored andcommitted
les: remove half-finished priority pool APIs (#19780)
* les: remove half-finish APIs * les: remove half-finish APIs
1 parent 8d2cf02 commit 59a3198

File tree

10 files changed

+153
-890
lines changed

10 files changed

+153
-890
lines changed

les/api.go

Lines changed: 5 additions & 451 deletions
Large diffs are not rendered by default.

les/costtracker.go

Lines changed: 78 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package les
1818

1919
import (
2020
"encoding/binary"
21-
"fmt"
2221
"math"
2322
"sync"
2423
"sync/atomic"
@@ -27,7 +26,6 @@ import (
2726
"github.com/ethereum/go-ethereum/common/mclock"
2827
"github.com/ethereum/go-ethereum/eth"
2928
"github.com/ethereum/go-ethereum/ethdb"
30-
"github.com/ethereum/go-ethereum/les/csvlogger"
3129
"github.com/ethereum/go-ethereum/les/flowcontrol"
3230
"github.com/ethereum/go-ethereum/log"
3331
)
@@ -96,40 +94,50 @@ const (
9694
// as the number of cost units per nanosecond of serving time in a single thread.
9795
// It is based on statistics collected during serving requests in high-load periods
9896
// and practically acts as a one-dimension request price scaling factor over the
99-
// pre-defined cost estimate table. Instead of scaling the cost values, the real
100-
// value of cost units is changed by applying the factor to the serving times. This
101-
// is more convenient because the changes in the cost factor can be applied immediately
102-
// without always notifying the clients about the changed cost tables.
97+
// pre-defined cost estimate table.
98+
//
99+
// The reason for dynamically maintaining the global factor on the server side is:
100+
// the estimated time cost of the request is fixed(hardcoded) but the configuration
101+
// of the machine running the server is really different. Therefore, the request serving
102+
// time in different machine will vary greatly. And also, the request serving time
103+
// in same machine may vary greatly with different request pressure.
104+
//
105+
// In order to more effectively limit resources, we apply the global factor to serving
106+
// time to make the result as close as possible to the estimated time cost no matter
107+
// the server is slow or fast. And also we scale the totalRecharge with global factor
108+
// so that fast server can serve more requests than estimation and slow server can
109+
// reduce request pressure.
110+
//
111+
// Instead of scaling the cost values, the real value of cost units is changed by
112+
// applying the factor to the serving times. This is more convenient because the
113+
// changes in the cost factor can be applied immediately without always notifying
114+
// the clients about the changed cost tables.
103115
type costTracker struct {
104116
db ethdb.Database
105117
stopCh chan chan struct{}
106118

107-
inSizeFactor, outSizeFactor float64
108-
gf, utilTarget float64
109-
minBufLimit uint64
119+
inSizeFactor float64
120+
outSizeFactor float64
121+
factor float64
122+
utilTarget float64
123+
minBufLimit uint64
110124

111-
gfUpdateCh chan gfUpdate
112125
gfLock sync.RWMutex
126+
reqInfoCh chan reqInfo
113127
totalRechargeCh chan uint64
114128

115-
stats map[uint64][]uint64
116-
logger *csvlogger.Logger
117-
logRecentTime, logRecentAvg, logTotalRecharge, logRelCost *csvlogger.Channel
129+
stats map[uint64][]uint64 // Used for testing purpose.
118130
}
119131

120132
// newCostTracker creates a cost tracker and loads the cost factor statistics from the database.
121133
// It also returns the minimum capacity that can be assigned to any peer.
122-
func newCostTracker(db ethdb.Database, config *eth.Config, logger *csvlogger.Logger) (*costTracker, uint64) {
134+
func newCostTracker(db ethdb.Database, config *eth.Config) (*costTracker, uint64) {
123135
utilTarget := float64(config.LightServ) * flowcontrol.FixedPointMultiplier / 100
124136
ct := &costTracker{
125-
db: db,
126-
stopCh: make(chan chan struct{}),
127-
utilTarget: utilTarget,
128-
logger: logger,
129-
logRelCost: logger.NewMinMaxChannel("relativeCost", true),
130-
logRecentTime: logger.NewMinMaxChannel("recentTime", true),
131-
logRecentAvg: logger.NewMinMaxChannel("recentAvg", true),
132-
logTotalRecharge: logger.NewChannel("totalRecharge", 0.01),
137+
db: db,
138+
stopCh: make(chan chan struct{}),
139+
reqInfoCh: make(chan reqInfo, 100),
140+
utilTarget: utilTarget,
133141
}
134142
if config.LightBandwidthIn > 0 {
135143
ct.inSizeFactor = utilTarget / float64(config.LightBandwidthIn)
@@ -204,8 +212,15 @@ func (ct *costTracker) makeCostList(globalFactor float64) RequestCostList {
204212
return list
205213
}
206214

207-
type gfUpdate struct {
208-
avgTimeCost, servingTime float64
215+
// reqInfo contains the estimated time cost and the actual request serving time
216+
// which acts as a feed source to update factor maintained by costTracker.
217+
type reqInfo struct {
218+
// avgTimeCost is the estimated time cost corresponding to maxCostTable.
219+
avgTimeCost float64
220+
221+
// servingTime is the CPU time corresponding to the actual processing of
222+
// the request.
223+
servingTime float64
209224
}
210225

211226
// gfLoop starts an event loop which updates the global cost factor which is
@@ -218,43 +233,48 @@ type gfUpdate struct {
218233
// total allowed serving time per second but nominated in cost units, should
219234
// also be scaled with the cost factor and is also updated by this loop.
220235
func (ct *costTracker) gfLoop() {
221-
var gfLog, recentTime, recentAvg float64
222-
lastUpdate := mclock.Now()
223-
expUpdate := lastUpdate
236+
var (
237+
factor, totalRecharge float64
238+
gfLog, recentTime, recentAvg float64
239+
240+
lastUpdate, expUpdate = mclock.Now(), mclock.Now()
241+
)
224242

243+
// Load historical cost factor statistics from the database.
225244
data, _ := ct.db.Get([]byte(gfDbKey))
226245
if len(data) == 8 {
227246
gfLog = math.Float64frombits(binary.BigEndian.Uint64(data[:]))
228247
}
229-
gf := math.Exp(gfLog)
230-
ct.gf = gf
231-
totalRecharge := ct.utilTarget * gf
232-
ct.gfUpdateCh = make(chan gfUpdate, 100)
233-
threshold := gfUsageThreshold * float64(gfUsageTC) * ct.utilTarget / 1000000
248+
ct.factor = math.Exp(gfLog)
249+
factor, totalRecharge = ct.factor, ct.utilTarget*ct.factor
250+
251+
// In order to perform factor data statistics under the high request pressure,
252+
// we only adjust factor when recent factor usage beyond the threshold.
253+
threshold := gfUsageThreshold * float64(gfUsageTC) * ct.utilTarget / flowcontrol.FixedPointMultiplier
234254

235255
go func() {
236256
saveCostFactor := func() {
237257
var data [8]byte
238258
binary.BigEndian.PutUint64(data[:], math.Float64bits(gfLog))
239259
ct.db.Put([]byte(gfDbKey), data[:])
240-
log.Debug("global cost factor saved", "value", gf)
260+
log.Debug("global cost factor saved", "value", factor)
241261
}
242262
saveTicker := time.NewTicker(time.Minute * 10)
243263

244264
for {
245265
select {
246-
case r := <-ct.gfUpdateCh:
266+
case r := <-ct.reqInfoCh:
267+
requestServedMeter.Mark(int64(r.servingTime))
268+
requestEstimatedMeter.Mark(int64(r.avgTimeCost / factor))
269+
requestServedTimer.Update(time.Duration(r.servingTime))
270+
relativeCostHistogram.Update(int64(r.avgTimeCost / factor / r.servingTime))
271+
247272
now := mclock.Now()
248-
if ct.logRelCost != nil && r.avgTimeCost > 1e-20 {
249-
ct.logRelCost.Update(r.servingTime * gf / r.avgTimeCost)
250-
}
251-
if r.servingTime > 1000000000 {
252-
ct.logger.Event(fmt.Sprintf("Very long servingTime = %f avgTimeCost = %f costFactor = %f", r.servingTime, r.avgTimeCost, gf))
253-
}
254273
dt := float64(now - expUpdate)
255274
expUpdate = now
256275
exp := math.Exp(-dt / float64(gfUsageTC))
257-
// calculate gf correction until now, based on previous values
276+
277+
// calculate factor correction until now, based on previous values
258278
var gfCorr float64
259279
max := recentTime
260280
if recentAvg > max {
@@ -268,27 +288,28 @@ func (ct *costTracker) gfLoop() {
268288
} else {
269289
gfCorr = math.Log(max/threshold) * float64(gfUsageTC)
270290
}
271-
// calculate log(gf) correction with the right direction and time constant
291+
// calculate log(factor) correction with the right direction and time constant
272292
if recentTime > recentAvg {
273-
// drop gf if actual serving times are larger than average estimates
293+
// drop factor if actual serving times are larger than average estimates
274294
gfCorr /= -float64(gfDropTC)
275295
} else {
276-
// raise gf if actual serving times are smaller than average estimates
296+
// raise factor if actual serving times are smaller than average estimates
277297
gfCorr /= float64(gfRaiseTC)
278298
}
279299
}
280300
// update recent cost values with current request
281301
recentTime = recentTime*exp + r.servingTime
282-
recentAvg = recentAvg*exp + r.avgTimeCost/gf
302+
recentAvg = recentAvg*exp + r.avgTimeCost/factor
283303

284304
if gfCorr != 0 {
305+
// Apply the correction to factor
285306
gfLog += gfCorr
286-
gf = math.Exp(gfLog)
307+
factor = math.Exp(gfLog)
308+
// Notify outside modules the new factor and totalRecharge.
287309
if time.Duration(now-lastUpdate) > time.Second {
288-
totalRecharge = ct.utilTarget * gf
289-
lastUpdate = now
310+
totalRecharge, lastUpdate = ct.utilTarget*factor, now
290311
ct.gfLock.Lock()
291-
ct.gf = gf
312+
ct.factor = factor
292313
ch := ct.totalRechargeCh
293314
ct.gfLock.Unlock()
294315
if ch != nil {
@@ -297,12 +318,12 @@ func (ct *costTracker) gfLoop() {
297318
default:
298319
}
299320
}
300-
log.Debug("global cost factor updated", "gf", gf)
321+
log.Debug("global cost factor updated", "factor", factor)
301322
}
302323
}
303-
ct.logRecentTime.Update(recentTime)
304-
ct.logRecentAvg.Update(recentAvg)
305-
ct.logTotalRecharge.Update(totalRecharge)
324+
recentServedGauge.Update(int64(recentTime))
325+
recentEstimatedGauge.Update(int64(recentAvg))
326+
totalRechargeGauge.Update(int64(totalRecharge))
306327

307328
case <-saveTicker.C:
308329
saveCostFactor()
@@ -321,7 +342,7 @@ func (ct *costTracker) globalFactor() float64 {
321342
ct.gfLock.RLock()
322343
defer ct.gfLock.RUnlock()
323344

324-
return ct.gf
345+
return ct.factor
325346
}
326347

327348
// totalRecharge returns the current total recharge parameter which is used by
@@ -330,7 +351,7 @@ func (ct *costTracker) totalRecharge() uint64 {
330351
ct.gfLock.RLock()
331352
defer ct.gfLock.RUnlock()
332353

333-
return uint64(ct.gf * ct.utilTarget)
354+
return uint64(ct.factor * ct.utilTarget)
334355
}
335356

336357
// subscribeTotalRecharge returns all future updates to the total recharge value
@@ -340,7 +361,7 @@ func (ct *costTracker) subscribeTotalRecharge(ch chan uint64) uint64 {
340361
defer ct.gfLock.Unlock()
341362

342363
ct.totalRechargeCh = ch
343-
return uint64(ct.gf * ct.utilTarget)
364+
return uint64(ct.factor * ct.utilTarget)
344365
}
345366

346367
// updateStats updates the global cost factor and (if enabled) the real cost vs.
@@ -349,7 +370,7 @@ func (ct *costTracker) updateStats(code, amount, servingTime, realCost uint64) {
349370
avg := reqAvgTimeCost[code]
350371
avgTimeCost := avg.baseCost + amount*avg.reqCost
351372
select {
352-
case ct.gfUpdateCh <- gfUpdate{float64(avgTimeCost), float64(servingTime)}:
373+
case ct.reqInfoCh <- reqInfo{float64(avgTimeCost), float64(servingTime)}:
353374
default:
354375
}
355376
if makeCostStats {

0 commit comments

Comments
 (0)