Skip to content

Commit 12e1a9c

Browse files
committed
test: StateDB.StopPrefetcher() blocks on WorkerPool.Wait()
1 parent ab480f0 commit 12e1a9c

File tree

3 files changed

+94
-2
lines changed

3 files changed

+94
-2
lines changed

core/state/trie_prefetcher.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string, opts ...
7979
// close iterates over all the subfetchers, aborts any that were left spinning
8080
// and reports the stats to the metrics subsystem.
8181
func (p *triePrefetcher) close() {
82+
p.abortFetchersConcurrently()
8283
for _, fetcher := range p.fetchers {
8384
fetcher.abort() // safe to do multiple times
8485

@@ -303,9 +304,11 @@ func (sf *subfetcher) abort() {
303304
// loop waits for new tasks to be scheduled and keeps loading them until it runs
304305
// out of tasks or its underlying trie is retrieved for committing.
305306
func (sf *subfetcher) loop() {
306-
defer sf.pool.wait()
307307
// No matter how the loop stops, signal anyone waiting that it's terminated
308-
defer close(sf.term)
308+
defer func() {
309+
sf.pool.wait()
310+
close(sf.term)
311+
}()
309312

310313
// Start by opening the trie and stop processing if it fails
311314
if sf.owner == (common.Hash{}) {

core/state/trie_prefetcher.libevm.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,21 @@ func (c *prefetcherConfig) applyTo(sf *subfetcher) {
6868
}
6969
}
7070

71+
// abortFetchersConcurrently calls [subfetcher.abort] on every fetcher, blocking
72+
// until all return. Calling abort() sequentially may result in later fetchers
73+
// accepting new work in the interim.
74+
func (p *triePrefetcher) abortFetchersConcurrently() {
75+
var wg sync.WaitGroup
76+
for _, f := range p.fetchers {
77+
wg.Add(1)
78+
go func(f *subfetcher) {
79+
f.abort()
80+
wg.Done()
81+
}(f)
82+
}
83+
wg.Wait()
84+
}
85+
7186
func (p *subfetcherPool) wait() {
7287
if p == nil || p.workers == nil {
7388
return
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright 2024 the libevm authors.
2+
//
3+
// The libevm additions to go-ethereum are free software: you can redistribute
4+
// them and/or modify them under the terms of the GNU Lesser General Public License
5+
// as published by the Free Software Foundation, either version 3 of the License,
6+
// or (at your option) any later version.
7+
//
8+
// The libevm additions are distributed in the hope that they will be useful,
9+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
11+
// General Public License for more details.
12+
//
13+
// You should have received a copy of the GNU Lesser General Public License
14+
// along with the go-ethereum library. If not, see
15+
// <http://www.gnu.org/licenses/>.
16+
17+
package state
18+
19+
import (
20+
"testing"
21+
"time"
22+
23+
"github.com/ava-labs/libevm/common"
24+
)
25+
26+
type synchronisingWorkerPool struct {
27+
executed, unblock chan struct{}
28+
}
29+
30+
var _ WorkerPool = (*synchronisingWorkerPool)(nil)
31+
32+
func (p *synchronisingWorkerPool) Execute(func()) {
33+
select {
34+
case <-p.executed:
35+
default:
36+
close(p.executed)
37+
}
38+
}
39+
40+
func (p *synchronisingWorkerPool) Wait() {
41+
<-p.unblock
42+
}
43+
44+
func TestStopPrefetcherWaitsOnWorkers(t *testing.T) {
45+
pool := &synchronisingWorkerPool{
46+
executed: make(chan struct{}),
47+
unblock: make(chan struct{}),
48+
}
49+
opt := WithWorkerPools(func() WorkerPool { return pool })
50+
51+
db := filledStateDB()
52+
db.prefetcher = newTriePrefetcher(db.db, db.originalRoot, "", opt)
53+
db.prefetcher.prefetch(common.Hash{}, common.Hash{}, common.Address{}, [][]byte{{}})
54+
55+
go func() {
56+
<-pool.executed
57+
// Sleep otherwise there is a small chance that we close pool.unblock
58+
// between db.StopPrefetcher() returning and the select receiving on the
59+
// channel.
60+
time.Sleep(time.Second)
61+
close(pool.unblock)
62+
}()
63+
64+
<-pool.executed
65+
db.StopPrefetcher()
66+
select {
67+
case <-pool.unblock:
68+
// The channel was closed, therefore pool.Wait() unblocked. This is a
69+
// necessary pre-condition for db.StopPrefetcher() unblocking, and the
70+
// purpose of this test.
71+
default:
72+
t.Errorf("%T.StopPrefetcher() returned before %T.Wait() unblocked", db, pool)
73+
}
74+
}

0 commit comments

Comments
 (0)