Skip to content

Commit c27a0ca

Browse files
committed
storage: replace Pebble.async with WaitGroup.Go
The storage.Pebble type's async method implemented exactly the behavior of the new WaitGroup Go method. Epic: none Release note: none
1 parent e837db7 commit c27a0ca

File tree

2 files changed

+27
-50
lines changed

2 files changed

+27
-50
lines changed

pkg/storage/pebble.go

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1137,9 +1137,10 @@ func newPebble(ctx context.Context, cfg engineConfig) (p *Pebble, err error) {
11371137
cfg.opts.CompactionConcurrencyRange = p.cco.Wrap(cfg.opts.CompactionConcurrencyRange)
11381138

11391139
p.diskUnhealthyTracker = diskUnhealthyTracker{
1140-
st: cfg.settings,
1141-
asyncRunner: p,
1142-
ts: timeutil.DefaultTimeSource{},
1140+
st: cfg.settings,
1141+
isClosed: p.Closed,
1142+
runAsync: p.asyncDone.Go,
1143+
ts: timeutil.DefaultTimeSource{},
11431144
}
11441145
// NB: The ordering of the event listeners passed to TeeEventListener is
11451146
// deliberate. The listener returned by makeMetricEtcEventListener is
@@ -1163,7 +1164,7 @@ func newPebble(ctx context.Context, cfg engineConfig) (p *Pebble, err error) {
11631164
// confusing.
11641165
cfg.env.RegisterOnDiskSlow(func(info pebble.DiskSlowInfo) {
11651166
el := cfg.opts.EventListener
1166-
p.async(func() { el.DiskSlow(info) })
1167+
p.asyncDone.Go(func() { el.DiskSlow(info) })
11671168
})
11681169
el := pebble.TeeEventListener(
11691170
p.makeMetricEtcEventListener(logCtx),
@@ -1289,17 +1290,6 @@ func category(name string, upperBound roachpb.Key) pebble.UserKeyCategory {
12891290
return pebble.UserKeyCategory{Name: name, UpperBound: ek.Encode()}
12901291
}
12911292

1292-
// async launches the provided function in a new goroutine. It uses a wait group
1293-
// to synchronize with (*Pebble).Close to ensure all launched goroutines have
1294-
// exited before Close returns.
1295-
func (p *Pebble) async(fn func()) {
1296-
p.asyncDone.Add(1)
1297-
go func() {
1298-
defer p.asyncDone.Done()
1299-
fn()
1300-
}()
1301-
}
1302-
13031293
// writePreventStartupFile creates a file that will prevent nodes from automatically restarting after
13041294
// experiencing sstable corruption.
13051295
func (p *Pebble) writePreventStartupFile(ctx context.Context, corruptionError error) {
@@ -1386,11 +1376,11 @@ func (p *Pebble) makeMetricEtcEventListener(ctx context.Context) pebble.EventLis
13861376
}
13871377
} else {
13881378
if p.cfg.diskMonitor != nil {
1389-
p.async(func() {
1379+
p.asyncDone.Go(func() {
13901380
log.Dev.Errorf(ctx, "disk stall detected: %s\n%s", info, p.cfg.diskMonitor.LogTrace())
13911381
})
13921382
} else {
1393-
p.async(func() { log.Dev.Errorf(ctx, "disk stall detected: %s", info) })
1383+
p.asyncDone.Go(func() { log.Dev.Errorf(ctx, "disk stall detected: %s", info) })
13941384
}
13951385
}
13961386
return
@@ -3170,17 +3160,12 @@ func (cco *compactionConcurrencyOverride) Wrap(
31703160
// explicitly.
31713161
const diskUnhealthyResetInterval = 5 * time.Second
31723162

3173-
// asyncRunner abstracts Pebble.async for testing.
3174-
type asyncRunner interface {
3175-
async(fn func())
3176-
Closed() bool
3177-
}
3178-
31793163
type diskUnhealthyTracker struct {
3180-
st *cluster.Settings
3181-
asyncRunner asyncRunner
3182-
ts timeutil.TimeSource
3183-
mu struct {
3164+
st *cluster.Settings
3165+
isClosed func() bool
3166+
runAsync func(fn func())
3167+
ts timeutil.TimeSource
3168+
mu struct {
31843169
syncutil.Mutex
31853170
lastUnhealthyEventTime time.Time
31863171
currentlyUnhealthy bool
@@ -3206,7 +3191,7 @@ func (dut *diskUnhealthyTracker) onDiskSlow(info pebble.DiskSlowInfo) {
32063191
if !dut.mu.currentlyUnhealthy {
32073192
dut.mu.currentlyUnhealthy = true
32083193
dut.mu.lastUnhealthySampleTime = now
3209-
dut.asyncRunner.async(func() {
3194+
dut.runAsync(func() {
32103195
// Reset the unhealthy status after a while.
32113196
ticker := dut.ts.NewTicker(diskUnhealthyResetInterval)
32123197
defer ticker.Stop()
@@ -3219,7 +3204,7 @@ func (dut *diskUnhealthyTracker) onDiskSlow(info pebble.DiskSlowInfo) {
32193204
tickReceivedForTesting(now)
32203205
for {
32213206
now := <-ticker.Ch()
3222-
isClosed := dut.asyncRunner.Closed()
3207+
isClosed := dut.isClosed()
32233208
dut.mu.Lock()
32243209
if !dut.mu.currentlyUnhealthy {
32253210
panic(errors.AssertionFailedf("unexpected currentlyUnhealthy=false"))

pkg/storage/pebble_test.go

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1911,22 +1911,6 @@ func TestPebbleSpanPolicyFunc(t *testing.T) {
19111911
}
19121912
}
19131913

1914-
type testAsyncRunner struct {
1915-
b *strings.Builder
1916-
closed atomic.Bool
1917-
}
1918-
1919-
func (r *testAsyncRunner) async(fn func()) {
1920-
fmt.Fprintf(r.b, "asyncRunner.async\n")
1921-
go fn()
1922-
}
1923-
1924-
func (r *testAsyncRunner) Closed() bool {
1925-
closed := r.closed.Load()
1926-
fmt.Fprintf(r.b, "asyncRunner.Closed(): %t\n", closed)
1927-
return closed
1928-
}
1929-
19301914
func TestDiskUnhealthyTracker(t *testing.T) {
19311915
defer leaktest.AfterTest(t)()
19321916
defer log.Scope(t).Close(t)
@@ -1937,14 +1921,22 @@ func TestDiskUnhealthyTracker(t *testing.T) {
19371921
b.Reset()
19381922
return str
19391923
}
1940-
runner := &testAsyncRunner{b: &b}
1924+
var isClosed atomic.Bool
19411925
ts := timeutil.NewManualTime(time.Unix(0, 0))
19421926
st := cluster.MakeTestingClusterSettings()
1943-
UnhealthyWriteDuration.Override(context.Background(), &st.SV, 5*time.Second)
1927+
UnhealthyWriteDuration.Override(t.Context(), &st.SV, 5*time.Second)
19441928
tickReceivedCh := make(chan time.Time, 1)
19451929
tracker := &diskUnhealthyTracker{
1946-
st: st,
1947-
asyncRunner: runner,
1930+
st: st,
1931+
isClosed: func() bool {
1932+
closed := isClosed.Load()
1933+
fmt.Fprintf(&b, "asyncRunner.Closed(): %t\n", closed)
1934+
return closed
1935+
},
1936+
runAsync: func(fn func()) {
1937+
fmt.Fprintf(&b, "asyncRunner.async\n")
1938+
go fn()
1939+
},
19481940
ts: ts,
19491941
testingTickReceivedCh: tickReceivedCh,
19501942
}
@@ -1974,7 +1966,7 @@ func TestDiskUnhealthyTracker(t *testing.T) {
19741966
return builderStr()
19751967

19761968
case "close":
1977-
runner.closed.Store(true)
1969+
isClosed.Store(true)
19781970
return ""
19791971

19801972
default:

0 commit comments

Comments
 (0)