Skip to content

Commit e4716bc

Browse files
committed
refactor: subfetchers track outstanding tasks
1 parent 12e1a9c commit e4716bc

File tree

3 files changed

+41
-25
lines changed

3 files changed

+41
-25
lines changed

core/state/trie_prefetcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string, opts ...
7979
// close iterates over all the subfetchers, aborts any that were left spinning
8080
// and reports the stats to the metrics subsystem.
8181
func (p *triePrefetcher) close() {
82-
p.abortFetchersConcurrently()
82+
p.abortFetchersAndReleaseWorkerPools()
8383
for _, fetcher := range p.fetchers {
8484
fetcher.abort() // safe to do multiple times
8585

core/state/trie_prefetcher.libevm.go

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@ type prefetcherConfig struct {
3131
newWorkers func() WorkerPool
3232
}
3333

34-
// A WorkerPool executes functions asynchronously.
34+
// A WorkerPool executes functions asynchronously. Done() is called to signal
35+
// that the pool is no longer needed and that Execute() is guaranteed to not be
36+
// called again.
3537
type WorkerPool interface {
3638
Execute(func())
37-
Wait()
39+
Done()
3840
}
3941

4042
// WithWorkerPools configures trie prefetching to execute asynchronously. The
@@ -49,6 +51,7 @@ func WithWorkerPools(ctor func() WorkerPool) PrefetcherOption {
4951
type subfetcherPool struct {
5052
workers WorkerPool
5153
tries sync.Pool
54+
wg sync.WaitGroup
5255
}
5356

5457
// applyTo configures the [subfetcher] to use a [WorkerPool] if one was provided
@@ -68,10 +71,9 @@ func (c *prefetcherConfig) applyTo(sf *subfetcher) {
6871
}
6972
}
7073

71-
// abortFetchersConcurrently calls [subfetcher.abort] on every fetcher, blocking
72-
// until all return. Calling abort() sequentially may result in later fetchers
73-
// accepting new work in the interim.
74-
func (p *triePrefetcher) abortFetchersConcurrently() {
74+
func (p *triePrefetcher) abortFetchersAndReleaseWorkerPools() {
75+
// Calling abort() sequentially may result in later fetchers accepting new
76+
// work in the interim.
7577
var wg sync.WaitGroup
7678
for _, f := range p.fetchers {
7779
wg.Add(1)
@@ -80,26 +82,35 @@ func (p *triePrefetcher) abortFetchersConcurrently() {
8082
wg.Done()
8183
}(f)
8284
}
85+
86+
// A WorkerPool is allowed to be shared between fetchers so we MUST wait for
87+
// them to finish all tasks otherwise they could call Execute() after
88+
// Done(), which we guarantee in the public API to be impossible.
8389
wg.Wait()
90+
for _, f := range p.fetchers {
91+
if w := f.pool.workers; w != nil {
92+
w.Done()
93+
}
94+
}
8495
}
8596

8697
func (p *subfetcherPool) wait() {
87-
if p == nil || p.workers == nil {
88-
return
89-
}
90-
p.workers.Wait()
98+
p.wg.Wait()
9199
}
92100

93101
// execute runs the provided function with a copy of the subfetcher's Trie.
94102
// Copies are stored in a [sync.Pool] to reduce creation overhead. If p was
95103
// configured with a [WorkerPool] then it is used for function execution,
96104
// otherwise `fn` is just called directly.
97105
func (p *subfetcherPool) execute(fn func(Trie)) {
106+
p.wg.Add(1)
98107
do := func() {
99108
t := p.tries.Get().(Trie)
100109
fn(t)
101110
p.tries.Put(t)
111+
p.wg.Done()
102112
}
113+
103114
if w := p.workers; w != nil {
104115
w.Execute(do)
105116
} else {

core/state/trie_prefetcher.libevm_test.go

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,28 +21,39 @@ import (
2121
"time"
2222

2323
"github.com/ava-labs/libevm/common"
24+
"github.com/stretchr/testify/assert"
2425
)
2526

2627
type synchronisingWorkerPool struct {
27-
executed, unblock chan struct{}
28+
t *testing.T
29+
executed, unblock chan struct{}
30+
done bool
31+
preconditionsToStopPrefetcher int
2832
}
2933

3034
var _ WorkerPool = (*synchronisingWorkerPool)(nil)
3135

32-
func (p *synchronisingWorkerPool) Execute(func()) {
36+
func (p *synchronisingWorkerPool) Execute(fn func()) {
37+
fn()
3338
select {
3439
case <-p.executed:
3540
default:
3641
close(p.executed)
3742
}
38-
}
3943

40-
func (p *synchronisingWorkerPool) Wait() {
4144
<-p.unblock
45+
assert.False(p.t, p.done, "Done() called before Execute() returns")
46+
p.preconditionsToStopPrefetcher++
47+
}
48+
49+
func (p *synchronisingWorkerPool) Done() {
50+
p.done = true
51+
p.preconditionsToStopPrefetcher++
4252
}
4353

4454
func TestStopPrefetcherWaitsOnWorkers(t *testing.T) {
4555
pool := &synchronisingWorkerPool{
56+
t: t,
4657
executed: make(chan struct{}),
4758
unblock: make(chan struct{}),
4859
}
@@ -55,20 +66,14 @@ func TestStopPrefetcherWaitsOnWorkers(t *testing.T) {
5566
go func() {
5667
<-pool.executed
5768
// Sleep otherwise there is a small chance that we close pool.unblock
58-
// between db.StopPrefetcher() returning and the select receiving on the
59-
// channel.
69+
// between db.StopPrefetcher() returning and the assertion.
6070
time.Sleep(time.Second)
6171
close(pool.unblock)
6272
}()
6373

6474
<-pool.executed
6575
db.StopPrefetcher()
66-
select {
67-
case <-pool.unblock:
68-
// The channel was closed, therefore pool.Wait() unblocked. This is a
69-
// necessary pre-condition for db.StopPrefetcher() unblocking, and the
70-
// purpose of this test.
71-
default:
72-
t.Errorf("%T.StopPrefetcher() returned before %T.Wait() unblocked", db, pool)
73-
}
76+
// If this happens then either Execute() hadn't returned or Done() wasn't
77+
// called.
78+
assert.Equalf(t, 2, pool.preconditionsToStopPrefetcher, "%T.StopPrefetcher() returned early", db)
7479
}

0 commit comments

Comments
 (0)