Skip to content

Commit 3010f9f

Browse files
authored
eth/downloader: change intial download size (#21366)
This changes how the downloader works, a little bit. Previously, when block sync started, we immediately started filling up to 8192 blocks. Usually this is fine, blocks are small in the early numbers. The threshold then is lowered as we measure the size of the blocks that are filled. However, if the node is shut down and restarts syncing while we're in a heavy segment, that might be bad. This PR introduces a more conservative initial threshold of 2K blocks instead.
1 parent d90bbce commit 3010f9f

File tree

5 files changed

+26
-24
lines changed

5 files changed

+26
-24
lines changed

eth/downloader/downloader.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
219219
stateBloom: stateBloom,
220220
mux: mux,
221221
checkpoint: checkpoint,
222-
queue: newQueue(blockCacheItems),
222+
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
223223
peers: newPeerSet(),
224224
rttEstimate: uint64(rttMaxEstimate),
225225
rttConfidence: uint64(1000000),
@@ -379,7 +379,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
379379
d.stateBloom.Close()
380380
}
381381
// Reset the queue, peer set and wake channels to clean any internal leftover state
382-
d.queue.Reset(blockCacheItems)
382+
d.queue.Reset(blockCacheMaxItems, blockCacheInitialItems)
383383
d.peers.Reset()
384384

385385
for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {

eth/downloader/downloader_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import (
3939
func init() {
4040
fullMaxForkAncestry = 10000
4141
lightMaxForkAncestry = 10000
42-
blockCacheItems = 1024
42+
blockCacheMaxItems = 1024
4343
fsHeaderContCheck = 500 * time.Millisecond
4444
}
4545

@@ -544,7 +544,7 @@ func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
544544
defer tester.terminate()
545545

546546
// Create a small enough block chain to download
547-
chain := testChainBase.shorten(blockCacheItems - 15)
547+
chain := testChainBase.shorten(blockCacheMaxItems - 15)
548548
tester.newPeer("peer", protocol, chain)
549549

550550
// Synchronise with the peer and make sure all relevant data was retrieved
@@ -607,8 +607,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
607607
}
608608
tester.lock.Unlock()
609609

610-
if cached == blockCacheItems ||
611-
cached == blockCacheItems-reorgProtHeaderDelay ||
610+
if cached == blockCacheMaxItems ||
611+
cached == blockCacheMaxItems-reorgProtHeaderDelay ||
612612
retrieved+cached+frozen == targetBlocks+1 ||
613613
retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay {
614614
break
@@ -619,8 +619,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
619619
tester.lock.RLock()
620620
retrieved = len(tester.ownBlocks)
621621
tester.lock.RUnlock()
622-
if cached != blockCacheItems && cached != blockCacheItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay {
623-
t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheItems, retrieved, frozen, targetBlocks+1)
622+
if cached != blockCacheMaxItems && cached != blockCacheMaxItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay {
623+
t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheMaxItems, retrieved, frozen, targetBlocks+1)
624624
}
625625

626626
// Permit the blocked blocks to import
@@ -873,7 +873,7 @@ func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
873873
defer tester.terminate()
874874

875875
// Create a small enough block chain to download
876-
chain := testChainBase.shorten(blockCacheItems - 15)
876+
chain := testChainBase.shorten(blockCacheMaxItems - 15)
877877

878878
// Create peers of every type
879879
tester.newPeer("peer 63", 63, chain)
@@ -965,7 +965,7 @@ func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
965965
tester := newTester()
966966
defer tester.terminate()
967967

968-
chain := testChainBase.shorten(blockCacheItems - 15)
968+
chain := testChainBase.shorten(blockCacheMaxItems - 15)
969969
brokenChain := chain.shorten(chain.len())
970970
delete(brokenChain.headerm, brokenChain.chain[brokenChain.len()/2])
971971
tester.newPeer("attack", protocol, brokenChain)
@@ -997,7 +997,7 @@ func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
997997
tester := newTester()
998998
defer tester.terminate()
999999

1000-
chain := testChainBase.shorten(blockCacheItems - 15)
1000+
chain := testChainBase.shorten(blockCacheMaxItems - 15)
10011001

10021002
// Attempt a full sync with an attacker feeding shifted headers
10031003
brokenChain := chain.shorten(chain.len())
@@ -1202,7 +1202,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
12021202

12031203
tester := newTester()
12041204
defer tester.terminate()
1205-
chain := testChainBase.shorten(blockCacheItems - 15)
1205+
chain := testChainBase.shorten(blockCacheMaxItems - 15)
12061206

12071207
// Set a sync init hook to catch progress changes
12081208
starting := make(chan struct{})
@@ -1362,7 +1362,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
13621362

13631363
tester := newTester()
13641364
defer tester.terminate()
1365-
chain := testChainBase.shorten(blockCacheItems - 15)
1365+
chain := testChainBase.shorten(blockCacheMaxItems - 15)
13661366

13671367
// Set a sync init hook to catch progress changes
13681368
starting := make(chan struct{})
@@ -1435,7 +1435,7 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
14351435

14361436
tester := newTester()
14371437
defer tester.terminate()
1438-
chain := testChainBase.shorten(blockCacheItems - 15)
1438+
chain := testChainBase.shorten(blockCacheMaxItems - 15)
14391439

14401440
// Set a sync init hook to catch progress changes
14411441
starting := make(chan struct{})

eth/downloader/queue.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,10 @@ const (
4040
)
4141

4242
var (
43-
blockCacheItems = 8192 // Maximum number of blocks to cache before throttling the download
44-
blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching
45-
blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones
43+
blockCacheMaxItems = 8192 // Maximum number of blocks to cache before throttling the download
44+
blockCacheInitialItems = 2048 // Initial number of blocks to start fetching, before we know the sizes of the blocks
45+
blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching
46+
blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones
4647
)
4748

4849
var (
@@ -142,7 +143,7 @@ type queue struct {
142143
}
143144

144145
// newQueue creates a new download queue for scheduling block retrieval.
145-
func newQueue(blockCacheLimit int) *queue {
146+
func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue {
146147
lock := new(sync.RWMutex)
147148
q := &queue{
148149
headerContCh: make(chan bool),
@@ -151,12 +152,12 @@ func newQueue(blockCacheLimit int) *queue {
151152
active: sync.NewCond(lock),
152153
lock: lock,
153154
}
154-
q.Reset(blockCacheLimit)
155+
q.Reset(blockCacheLimit, thresholdInitialSize)
155156
return q
156157
}
157158

158159
// Reset clears out the queue contents.
159-
func (q *queue) Reset(blockCacheLimit int) {
160+
func (q *queue) Reset(blockCacheLimit int, thresholdInitialSize int) {
160161
q.lock.Lock()
161162
defer q.lock.Unlock()
162163

@@ -175,6 +176,7 @@ func (q *queue) Reset(blockCacheLimit int) {
175176
q.receiptPendPool = make(map[string]*fetchRequest)
176177

177178
q.resultCache = newResultStore(blockCacheLimit)
179+
q.resultCache.SetThrottleThreshold(uint64(thresholdInitialSize))
178180
}
179181

180182
// Close marks the end of the sync, unblocking Results.

eth/downloader/queue_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func dummyPeer(id string) *peerConnection {
9797
}
9898

9999
func TestBasics(t *testing.T) {
100-
q := newQueue(10)
100+
q := newQueue(10, 10)
101101
if !q.Idle() {
102102
t.Errorf("new queue should be idle")
103103
}
@@ -174,7 +174,7 @@ func TestBasics(t *testing.T) {
174174
}
175175

176176
func TestEmptyBlocks(t *testing.T) {
177-
q := newQueue(10)
177+
q := newQueue(10, 10)
178178

179179
q.Prepare(1, FastSync)
180180
// Schedule a batch of headers
@@ -244,7 +244,7 @@ func XTestDelivery(t *testing.T) {
244244
log.Root().SetHandler(log.StdoutHandler)
245245

246246
}
247-
q := newQueue(10)
247+
q := newQueue(10, 10)
248248
var wg sync.WaitGroup
249249
q.Prepare(1, FastSync)
250250
wg.Add(1)

eth/downloader/testchain_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ var (
3939
)
4040

4141
// The common prefix of all test chains:
42-
var testChainBase = newTestChain(blockCacheItems+200, testGenesis)
42+
var testChainBase = newTestChain(blockCacheMaxItems+200, testGenesis)
4343

4444
// Different forks on top of the base chain:
4545
var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain

0 commit comments

Comments
 (0)