@@ -54,14 +54,15 @@ var (
54
54
blockTargetRTT = 3 * time .Second / 2 // [eth/61] Target time for completing a block retrieval request
55
55
blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired
56
56
57
- headerTargetRTT = time .Second // [eth/62] Target time for completing a header retrieval request (only for measurements for now)
58
- headerTTL = 3 * time .Second // [eth/62] Time it takes for a header request to time out
59
- bodyTargetRTT = 3 * time .Second / 2 // [eth/62] Target time for completing a block body retrieval request
60
- bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired
61
- receiptTargetRTT = 3 * time .Second / 2 // [eth/63] Target time for completing a receipt retrieval request
62
- receiptTTL = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired
63
- stateTargetRTT = 2 * time .Second / 2 // [eth/63] Target time for completing a state trie retrieval request
64
- stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired
57
+ rttMinEstimate = 2 * time .Second // Minimum round-trip time to target for download requests
58
+ rttMaxEstimate = 20 * time .Second // Maximum rount-trip time to target for download requests
59
+ rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
60
+ ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
61
+ ttlLimit = time .Minute // Maximum TTL allowance to prevent reaching crazy timeouts
62
+
63
+ qosTuningPeers = 5 // Number of peers to tune based on (best peers)
64
+ qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
65
+ qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value
65
66
66
67
maxQueuedHashes = 32 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
67
68
maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
@@ -113,7 +114,8 @@ type Downloader struct {
113
114
fsPivotLock * types.Header // Pivot header on critical section entry (cannot change between retries)
114
115
fsPivotFails int // Number of fast sync failures in the critical section
115
116
116
- interrupt int32 // Atomic boolean to signal termination
117
+ rttEstimate uint64 // Round trip time to target for download requests
118
+ rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
117
119
118
120
// Statistics
119
121
syncStatsChainOrigin uint64 // Origin block number where syncing started at
@@ -159,6 +161,9 @@ type Downloader struct {
159
161
cancelCh chan struct {} // Channel to cancel mid-flight syncs
160
162
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
161
163
164
+ quitCh chan struct {} // Quit channel to signal termination
165
+ quitLock sync.RWMutex // Lock to prevent double closes
166
+
162
167
// Testing hooks
163
168
syncInitHook func (uint64 , uint64 ) // Method to call upon initiating a new sync run
164
169
bodyFetchHook func ([]* types.Header ) // Method to call upon starting a block body fetch
@@ -172,11 +177,13 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha
172
177
headFastBlock headFastBlockRetrievalFn , commitHeadBlock headBlockCommitterFn , getTd tdRetrievalFn , insertHeaders headerChainInsertFn ,
173
178
insertBlocks blockChainInsertFn , insertReceipts receiptChainInsertFn , rollback chainRollbackFn , dropPeer peerDropFn ) * Downloader {
174
179
175
- return & Downloader {
180
+ dl := & Downloader {
176
181
mode : FullSync ,
177
182
mux : mux ,
178
183
queue : newQueue (stateDb ),
179
184
peers : newPeerSet (),
185
+ rttEstimate : uint64 (rttMaxEstimate ),
186
+ rttConfidence : uint64 (1000000 ),
180
187
hasHeader : hasHeader ,
181
188
hasBlockAndState : hasBlockAndState ,
182
189
getHeader : getHeader ,
@@ -203,7 +210,10 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha
203
210
receiptWakeCh : make (chan bool , 1 ),
204
211
stateWakeCh : make (chan bool , 1 ),
205
212
headerProcCh : make (chan []* types.Header , 1 ),
213
+ quitCh : make (chan struct {}),
206
214
}
215
+ go dl .qosTuner ()
216
+ return dl
207
217
}
208
218
209
219
// Progress retrieves the synchronisation boundaries, specifically the origin
@@ -250,6 +260,8 @@ func (d *Downloader) RegisterPeer(id string, version int, head common.Hash,
250
260
glog .V (logger .Error ).Infoln ("Register failed:" , err )
251
261
return err
252
262
}
263
+ d .qosReduceConfidence ()
264
+
253
265
return nil
254
266
}
255
267
@@ -515,7 +527,16 @@ func (d *Downloader) cancel() {
515
527
// Terminate interrupts the downloader, canceling all pending operations.
516
528
// The downloader cannot be reused after calling Terminate.
517
529
func (d * Downloader ) Terminate () {
518
- atomic .StoreInt32 (& d .interrupt , 1 )
530
+ // Close the termination channel (make sure double close is allowed)
531
+ d .quitLock .Lock ()
532
+ select {
533
+ case <- d .quitCh :
534
+ default :
535
+ close (d .quitCh )
536
+ }
537
+ d .quitLock .Unlock ()
538
+
539
+ // Cancel any pending download requests
519
540
d .cancel ()
520
541
}
521
542
@@ -932,7 +953,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
932
953
// Reserve a chunk of hashes for a peer. A nil can mean either that
933
954
// no more hashes are available, or that the peer is known not to
934
955
// have them.
935
- request := d .queue .ReserveBlocks (peer , peer .BlockCapacity ())
956
+ request := d .queue .ReserveBlocks (peer , peer .BlockCapacity (blockTargetRTT ))
936
957
if request == nil {
937
958
continue
938
959
}
@@ -973,7 +994,7 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
973
994
// Request the advertised remote head block and wait for the response
974
995
go p .getRelHeaders (p .head , 1 , 0 , false )
975
996
976
- timeout := time .After (headerTTL )
997
+ timeout := time .After (d . requestTTL () )
977
998
for {
978
999
select {
979
1000
case <- d .cancelCh :
@@ -1041,7 +1062,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
1041
1062
1042
1063
// Wait for the remote response to the head fetch
1043
1064
number , hash := uint64 (0 ), common.Hash {}
1044
- timeout := time .After (hashTTL )
1065
+ timeout := time .After (d . requestTTL () )
1045
1066
1046
1067
for finished := false ; ! finished ; {
1047
1068
select {
@@ -1118,7 +1139,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
1118
1139
// Split our chain interval in two, and request the hash to cross check
1119
1140
check := (start + end ) / 2
1120
1141
1121
- timeout := time .After (hashTTL )
1142
+ timeout := time .After (d . requestTTL () )
1122
1143
go p .getAbsHeaders (uint64 (check ), 1 , 0 , false )
1123
1144
1124
1145
// Wait until a reply arrives to this request
@@ -1199,7 +1220,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
1199
1220
1200
1221
getHeaders := func (from uint64 ) {
1201
1222
request = time .Now ()
1202
- timeout .Reset (headerTTL )
1223
+ timeout .Reset (d . requestTTL () )
1203
1224
1204
1225
if skeleton {
1205
1226
glog .V (logger .Detail ).Infof ("%v: fetching %d skeleton headers from #%d" , p , MaxHeaderFetch , from )
@@ -1311,13 +1332,13 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
1311
1332
pack := packet .(* headerPack )
1312
1333
return d .queue .DeliverHeaders (pack .peerId , pack .headers , d .headerProcCh )
1313
1334
}
1314
- expire = func () map [string ]int { return d .queue .ExpireHeaders (headerTTL ) }
1335
+ expire = func () map [string ]int { return d .queue .ExpireHeaders (d . requestTTL () ) }
1315
1336
throttle = func () bool { return false }
1316
1337
reserve = func (p * peer , count int ) (* fetchRequest , bool , error ) {
1317
1338
return d .queue .ReserveHeaders (p , count ), false , nil
1318
1339
}
1319
1340
fetch = func (p * peer , req * fetchRequest ) error { return p .FetchHeaders (req .From , MaxHeaderFetch ) }
1320
- capacity = func (p * peer ) int { return p .HeaderCapacity () }
1341
+ capacity = func (p * peer ) int { return p .HeaderCapacity (d . requestRTT () ) }
1321
1342
setIdle = func (p * peer , accepted int ) { p .SetHeadersIdle (accepted ) }
1322
1343
)
1323
1344
err := d .fetchParts (errCancelHeaderFetch , d .headerCh , deliver , d .queue .headerContCh , expire ,
@@ -1341,9 +1362,9 @@ func (d *Downloader) fetchBodies(from uint64) error {
1341
1362
pack := packet .(* bodyPack )
1342
1363
return d .queue .DeliverBodies (pack .peerId , pack .transactions , pack .uncles )
1343
1364
}
1344
- expire = func () map [string ]int { return d .queue .ExpireBodies (bodyTTL ) }
1365
+ expire = func () map [string ]int { return d .queue .ExpireBodies (d . requestTTL () ) }
1345
1366
fetch = func (p * peer , req * fetchRequest ) error { return p .FetchBodies (req ) }
1346
- capacity = func (p * peer ) int { return p .BlockCapacity () }
1367
+ capacity = func (p * peer ) int { return p .BlockCapacity (d . requestRTT () ) }
1347
1368
setIdle = func (p * peer , accepted int ) { p .SetBodiesIdle (accepted ) }
1348
1369
)
1349
1370
err := d .fetchParts (errCancelBodyFetch , d .bodyCh , deliver , d .bodyWakeCh , expire ,
@@ -1365,9 +1386,9 @@ func (d *Downloader) fetchReceipts(from uint64) error {
1365
1386
pack := packet .(* receiptPack )
1366
1387
return d .queue .DeliverReceipts (pack .peerId , pack .receipts )
1367
1388
}
1368
- expire = func () map [string ]int { return d .queue .ExpireReceipts (receiptTTL ) }
1389
+ expire = func () map [string ]int { return d .queue .ExpireReceipts (d . requestTTL () ) }
1369
1390
fetch = func (p * peer , req * fetchRequest ) error { return p .FetchReceipts (req ) }
1370
- capacity = func (p * peer ) int { return p .ReceiptCapacity () }
1391
+ capacity = func (p * peer ) int { return p .ReceiptCapacity (d . requestRTT () ) }
1371
1392
setIdle = func (p * peer , accepted int ) { p .SetReceiptsIdle (accepted ) }
1372
1393
)
1373
1394
err := d .fetchParts (errCancelReceiptFetch , d .receiptCh , deliver , d .receiptWakeCh , expire ,
@@ -1417,13 +1438,13 @@ func (d *Downloader) fetchNodeData() error {
1417
1438
}
1418
1439
})
1419
1440
}
1420
- expire = func () map [string ]int { return d .queue .ExpireNodeData (stateTTL ) }
1441
+ expire = func () map [string ]int { return d .queue .ExpireNodeData (d . requestTTL () ) }
1421
1442
throttle = func () bool { return false }
1422
1443
reserve = func (p * peer , count int ) (* fetchRequest , bool , error ) {
1423
1444
return d .queue .ReserveNodeData (p , count ), false , nil
1424
1445
}
1425
1446
fetch = func (p * peer , req * fetchRequest ) error { return p .FetchNodeData (req ) }
1426
- capacity = func (p * peer ) int { return p .NodeDataCapacity () }
1447
+ capacity = func (p * peer ) int { return p .NodeDataCapacity (d . requestRTT () ) }
1427
1448
setIdle = func (p * peer , accepted int ) { p .SetNodeDataIdle (accepted ) }
1428
1449
)
1429
1450
err := d .fetchParts (errCancelStateFetch , d .stateCh , deliver , d .stateWakeCh , expire ,
@@ -1799,8 +1820,10 @@ func (d *Downloader) processContent() error {
1799
1820
}
1800
1821
for len (results ) != 0 {
1801
1822
// Check for any termination requests
1802
- if atomic .LoadInt32 (& d .interrupt ) == 1 {
1823
+ select {
1824
+ case <- d .quitCh :
1803
1825
return errCancelContentProcessing
1826
+ default :
1804
1827
}
1805
1828
// Retrieve the a batch of results to import
1806
1829
var (
@@ -1901,3 +1924,74 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i
1901
1924
return errNoSyncActive
1902
1925
}
1903
1926
}
1927
+
1928
+ // qosTuner is the quality of service tuning loop that occasionally gathers the
1929
+ // peer latency statistics and updates the estimated request round trip time.
1930
+ func (d * Downloader ) qosTuner () {
1931
+ for {
1932
+ // Retrieve the current median RTT and integrate into the previoust target RTT
1933
+ rtt := time .Duration (float64 (1 - qosTuningImpact )* float64 (atomic .LoadUint64 (& d .rttEstimate )) + qosTuningImpact * float64 (d .peers .medianRTT ()))
1934
+ atomic .StoreUint64 (& d .rttEstimate , uint64 (rtt ))
1935
+
1936
+ // A new RTT cycle passed, increase our confidence in the estimated RTT
1937
+ conf := atomic .LoadUint64 (& d .rttConfidence )
1938
+ conf = conf + (1000000 - conf )/ 2
1939
+ atomic .StoreUint64 (& d .rttConfidence , conf )
1940
+
1941
+ // Log the new QoS values and sleep until the next RTT
1942
+ glog .V (logger .Debug ).Infof ("Quality of service: rtt %v, conf %.3f, ttl %v" , rtt , float64 (conf )/ 1000000.0 , d .requestTTL ())
1943
+ select {
1944
+ case <- d .quitCh :
1945
+ return
1946
+ case <- time .After (rtt ):
1947
+ }
1948
+ }
1949
+ }
1950
+
1951
+ // qosReduceConfidence is meant to be called when a new peer joins the downloader's
1952
+ // peer set, needing to reduce the confidence we have in out QoS estimates.
1953
+ func (d * Downloader ) qosReduceConfidence () {
1954
+ // If we have a single peer, confidence is always 1
1955
+ peers := uint64 (d .peers .Len ())
1956
+ if peers == 1 {
1957
+ atomic .StoreUint64 (& d .rttConfidence , 1000000 )
1958
+ return
1959
+ }
1960
+ // If we have a ton of peers, don't drop confidence)
1961
+ if peers >= uint64 (qosConfidenceCap ) {
1962
+ return
1963
+ }
1964
+ // Otherwise drop the confidence factor
1965
+ conf := atomic .LoadUint64 (& d .rttConfidence ) * (peers - 1 ) / peers
1966
+ if float64 (conf )/ 1000000 < rttMinConfidence {
1967
+ conf = uint64 (rttMinConfidence * 1000000 )
1968
+ }
1969
+ atomic .StoreUint64 (& d .rttConfidence , conf )
1970
+
1971
+ rtt := time .Duration (atomic .LoadUint64 (& d .rttEstimate ))
1972
+ glog .V (logger .Debug ).Infof ("Quality of service: rtt %v, conf %.3f, ttl %v" , rtt , float64 (conf )/ 1000000.0 , d .requestTTL ())
1973
+ }
1974
+
1975
+ // requestRTT returns the current target round trip time for a download request
1976
+ // to complete in.
1977
+ //
1978
+ // Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
1979
+ // the downloader tries to adapt queries to the RTT, so multiple RTT values can
1980
+ // be adapted to, but smaller ones are preffered (stabler download stream).
1981
+ func (d * Downloader ) requestRTT () time.Duration {
1982
+ return time .Duration (atomic .LoadUint64 (& d .rttEstimate )) * 9 / 10
1983
+ }
1984
+
1985
+ // requestTTL returns the current timeout allowance for a single download request
1986
+ // to finish under.
1987
+ func (d * Downloader ) requestTTL () time.Duration {
1988
+ var (
1989
+ rtt = time .Duration (atomic .LoadUint64 (& d .rttEstimate ))
1990
+ conf = float64 (atomic .LoadUint64 (& d .rttConfidence )) / 1000000.0
1991
+ )
1992
+ ttl := time .Duration (ttlScaling ) * time .Duration (float64 (rtt )/ conf )
1993
+ if ttl > ttlLimit {
1994
+ ttl = ttlLimit
1995
+ }
1996
+ return ttl
1997
+ }
0 commit comments