@@ -21,10 +21,12 @@ import (
21
21
"encoding/json"
22
22
"errors"
23
23
"fmt"
24
+ gomath "math"
24
25
"math/big"
25
26
"math/rand"
26
27
"sort"
27
28
"sync"
29
+ "sync/atomic"
28
30
"time"
29
31
30
32
"github.com/ethereum/go-ethereum/common"
@@ -78,6 +80,29 @@ const (
78
80
// and waste round trip times. If it's too high, we're capping responses and
79
81
// waste bandwidth.
80
82
maxTrieRequestCount = maxRequestSize / 512
83
+
84
+ // trienodeHealRateMeasurementImpact is the impact a single measurement has on
85
+ // the local node's trienode processing capacity. A value closer to 0 reacts
86
+ // slower to sudden changes, but it is also more stable against temporary hiccups.
87
+ trienodeHealRateMeasurementImpact = 0.005
88
+
89
+ // minTrienodeHealThrottle is the minimum divisor for throttling trie node
90
+ // heal requests to avoid overloading the local node and exessively expanding
91
+ // the state trie bedth wise.
92
+ minTrienodeHealThrottle = 1
93
+
94
+ // maxTrienodeHealThrottle is the maximum divisor for throttling trie node
95
+ // heal requests to avoid overloading the local node and exessively expanding
96
+ // the state trie bedth wise.
97
+ maxTrienodeHealThrottle = maxTrieRequestCount
98
+
99
+ // trienodeHealThrottleIncrease is the multiplier for the throttle when the
100
+ // rate of arriving data is higher than the rate of processing it.
101
+ trienodeHealThrottleIncrease = 1.33
102
+
103
+ // trienodeHealThrottleDecrease is the divisor for the throttle when the
104
+ // rate of arriving data is lower than the rate of processing it.
105
+ trienodeHealThrottleDecrease = 1.25
81
106
)
82
107
83
108
var (
@@ -431,6 +456,11 @@ type Syncer struct {
431
456
trienodeHealReqs map [uint64 ]* trienodeHealRequest // Trie node requests currently running
432
457
bytecodeHealReqs map [uint64 ]* bytecodeHealRequest // Bytecode requests currently running
433
458
459
+ trienodeHealRate float64 // Average heal rate for processing trie node data
460
+ trienodeHealPend uint64 // Number of trie nodes currently pending for processing
461
+ trienodeHealThrottle float64 // Divisor for throttling the amount of trienode heal data requested
462
+ trienodeHealThrottled time.Time // Timestamp the last time the throttle was updated
463
+
434
464
trienodeHealSynced uint64 // Number of state trie nodes downloaded
435
465
trienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk
436
466
trienodeHealDups uint64 // Number of state trie nodes already processed
@@ -476,9 +506,10 @@ func NewSyncer(db ethdb.KeyValueStore) *Syncer {
476
506
trienodeHealIdlers : make (map [string ]struct {}),
477
507
bytecodeHealIdlers : make (map [string ]struct {}),
478
508
479
- trienodeHealReqs : make (map [uint64 ]* trienodeHealRequest ),
480
- bytecodeHealReqs : make (map [uint64 ]* bytecodeHealRequest ),
481
- stateWriter : db .NewBatch (),
509
+ trienodeHealReqs : make (map [uint64 ]* trienodeHealRequest ),
510
+ bytecodeHealReqs : make (map [uint64 ]* bytecodeHealRequest ),
511
+ trienodeHealThrottle : maxTrienodeHealThrottle , // Tune downward instead of insta-filling with junk
512
+ stateWriter : db .NewBatch (),
482
513
483
514
extProgress : new (SyncProgress ),
484
515
}
@@ -1321,6 +1352,10 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
1321
1352
if cap > maxTrieRequestCount {
1322
1353
cap = maxTrieRequestCount
1323
1354
}
1355
+ cap = int (float64 (cap ) / s .trienodeHealThrottle )
1356
+ if cap <= 0 {
1357
+ cap = 1
1358
+ }
1324
1359
var (
1325
1360
hashes = make ([]common.Hash , 0 , cap )
1326
1361
paths = make ([]string , 0 , cap )
@@ -2090,6 +2125,10 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
2090
2125
// processTrienodeHealResponse integrates an already validated trienode response
2091
2126
// into the healer tasks.
2092
2127
func (s * Syncer ) processTrienodeHealResponse (res * trienodeHealResponse ) {
2128
+ var (
2129
+ start = time .Now ()
2130
+ fills int
2131
+ )
2093
2132
for i , hash := range res .hashes {
2094
2133
node := res .nodes [i ]
2095
2134
@@ -2098,6 +2137,8 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
2098
2137
res .task .trieTasks [res .paths [i ]] = res .hashes [i ]
2099
2138
continue
2100
2139
}
2140
+ fills ++
2141
+
2101
2142
// Push the trie node into the state syncer
2102
2143
s .trienodeHealSynced ++
2103
2144
s .trienodeHealBytes += common .StorageSize (len (node ))
@@ -2121,6 +2162,50 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
2121
2162
log .Crit ("Failed to persist healing data" , "err" , err )
2122
2163
}
2123
2164
log .Debug ("Persisted set of healing data" , "type" , "trienodes" , "bytes" , common .StorageSize (batch .ValueSize ()))
2165
+
2166
+ // Calculate the processing rate of one filled trie node
2167
+ rate := float64 (fills ) / (float64 (time .Since (start )) / float64 (time .Second ))
2168
+
2169
+ // Update the currently measured trienode queueing and processing throughput.
2170
+ //
2171
+ // The processing rate needs to be updated uniformly independent if we've
2172
+ // processed 1x100 trie nodes or 100x1 to keep the rate consistent even in
2173
+ // the face of varying network packets. As such, we cannot just measure the
2174
+ // time it took to process N trie nodes and update once, we need one update
2175
+ // per trie node.
2176
+ //
2177
+ // Naively, that would be:
2178
+ //
2179
+ // for i:=0; i<fills; i++ {
2180
+ // healRate = (1-measurementImpact)*oldRate + measurementImpact*newRate
2181
+ // }
2182
+ //
2183
+ // Essentially, a recursive expansion of HR = (1-MI)*HR + MI*NR.
2184
+ //
2185
+ // We can expand that formula for the Nth item as:
2186
+ // HR(N) = (1-MI)^N*OR + (1-MI)^(N-1)*MI*NR + (1-MI)^(N-2)*MI*NR + ... + (1-MI)^0*MI*NR
2187
+ //
2188
+ // The above is a geometric sequence that can be summed to:
2189
+ // HR(N) = (1-MI)^N*(OR-NR) + NR
2190
+ s .trienodeHealRate = gomath .Pow (1 - trienodeHealRateMeasurementImpact , float64 (fills ))* (s .trienodeHealRate - rate ) + rate
2191
+
2192
+ pending := atomic .LoadUint64 (& s .trienodeHealPend )
2193
+ if time .Since (s .trienodeHealThrottled ) > time .Second {
2194
+ // Periodically adjust the trie node throttler
2195
+ if float64 (pending ) > 2 * s .trienodeHealRate {
2196
+ s .trienodeHealThrottle *= trienodeHealThrottleIncrease
2197
+ } else {
2198
+ s .trienodeHealThrottle /= trienodeHealThrottleDecrease
2199
+ }
2200
+ if s .trienodeHealThrottle > maxTrienodeHealThrottle {
2201
+ s .trienodeHealThrottle = maxTrienodeHealThrottle
2202
+ } else if s .trienodeHealThrottle < minTrienodeHealThrottle {
2203
+ s .trienodeHealThrottle = minTrienodeHealThrottle
2204
+ }
2205
+ s .trienodeHealThrottled = time .Now ()
2206
+
2207
+ log .Debug ("Updated trie node heal throttler" , "rate" , s .trienodeHealRate , "pending" , pending , "throttle" , s .trienodeHealThrottle )
2208
+ }
2124
2209
}
2125
2210
2126
2211
// processBytecodeHealResponse integrates an already validated bytecode response
@@ -2655,10 +2740,12 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
2655
2740
2656
2741
// Cross reference the requested trienodes with the response to find gaps
2657
2742
// that the serving node is missing
2658
- hasher := sha3 .NewLegacyKeccak256 ().(crypto.KeccakState )
2659
- hash := make ([]byte , 32 )
2660
-
2661
- nodes := make ([][]byte , len (req .hashes ))
2743
+ var (
2744
+ hasher = sha3 .NewLegacyKeccak256 ().(crypto.KeccakState )
2745
+ hash = make ([]byte , 32 )
2746
+ nodes = make ([][]byte , len (req .hashes ))
2747
+ fills uint64
2748
+ )
2662
2749
for i , j := 0 , 0 ; i < len (trienodes ); i ++ {
2663
2750
// Find the next hash that we've been served, leaving misses with nils
2664
2751
hasher .Reset ()
@@ -2670,16 +2757,22 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
2670
2757
}
2671
2758
if j < len (req .hashes ) {
2672
2759
nodes [j ] = trienodes [i ]
2760
+ fills ++
2673
2761
j ++
2674
2762
continue
2675
2763
}
2676
2764
// We've either ran out of hashes, or got unrequested data
2677
2765
logger .Warn ("Unexpected healing trienodes" , "count" , len (trienodes )- i )
2766
+
2678
2767
// Signal this request as failed, and ready for rescheduling
2679
2768
s .scheduleRevertTrienodeHealRequest (req )
2680
2769
return errors .New ("unexpected healing trienode" )
2681
2770
}
2682
2771
// Response validated, send it to the scheduler for filling
2772
+ atomic .AddUint64 (& s .trienodeHealPend , fills )
2773
+ defer func () {
2774
+ atomic .AddUint64 (& s .trienodeHealPend , ^ (fills - 1 ))
2775
+ }()
2683
2776
response := & trienodeHealResponse {
2684
2777
paths : req .paths ,
2685
2778
task : req .task ,
0 commit comments