Skip to content

Commit 680a632

Browse files
craig[bot]jbowens
andcommitted
Merge #157950
157950: storage: replace Pebble.async with WaitGroup.Go r=sumeerbhola a=jbowens The storage.Pebble type's async method implemented exactly the behavior of the new WaitGroup Go method. Epic: none Release note: none Co-authored-by: Jackson Owens <[email protected]>
2 parents 834f678 + c27a0ca commit 680a632

File tree

2 files changed

+27
-50
lines changed

2 files changed

+27
-50
lines changed

pkg/storage/pebble.go

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1137,9 +1137,10 @@ func newPebble(ctx context.Context, cfg engineConfig) (p *Pebble, err error) {
11371137
cfg.opts.CompactionConcurrencyRange = p.cco.Wrap(cfg.opts.CompactionConcurrencyRange)
11381138

11391139
p.diskUnhealthyTracker = diskUnhealthyTracker{
1140-
st: cfg.settings,
1141-
asyncRunner: p,
1142-
ts: timeutil.DefaultTimeSource{},
1140+
st: cfg.settings,
1141+
isClosed: p.Closed,
1142+
runAsync: p.asyncDone.Go,
1143+
ts: timeutil.DefaultTimeSource{},
11431144
}
11441145
// NB: The ordering of the event listeners passed to TeeEventListener is
11451146
// deliberate. The listener returned by makeMetricEtcEventListener is
@@ -1163,7 +1164,7 @@ func newPebble(ctx context.Context, cfg engineConfig) (p *Pebble, err error) {
11631164
// confusing.
11641165
cfg.env.RegisterOnDiskSlow(func(info pebble.DiskSlowInfo) {
11651166
el := cfg.opts.EventListener
1166-
p.async(func() { el.DiskSlow(info) })
1167+
p.asyncDone.Go(func() { el.DiskSlow(info) })
11671168
})
11681169
el := pebble.TeeEventListener(
11691170
p.makeMetricEtcEventListener(logCtx),
@@ -1289,17 +1290,6 @@ func category(name string, upperBound roachpb.Key) pebble.UserKeyCategory {
12891290
return pebble.UserKeyCategory{Name: name, UpperBound: ek.Encode()}
12901291
}
12911292

1292-
// async launches the provided function in a new goroutine. It uses a wait group
1293-
// to synchronize with (*Pebble).Close to ensure all launched goroutines have
1294-
// exited before Close returns.
1295-
func (p *Pebble) async(fn func()) {
1296-
p.asyncDone.Add(1)
1297-
go func() {
1298-
defer p.asyncDone.Done()
1299-
fn()
1300-
}()
1301-
}
1302-
13031293
// writePreventStartupFile creates a file that will prevent nodes from automatically restarting after
13041294
// experiencing sstable corruption.
13051295
func (p *Pebble) writePreventStartupFile(ctx context.Context, corruptionError error) {
@@ -1386,11 +1376,11 @@ func (p *Pebble) makeMetricEtcEventListener(ctx context.Context) pebble.EventLis
13861376
}
13871377
} else {
13881378
if p.cfg.diskMonitor != nil {
1389-
p.async(func() {
1379+
p.asyncDone.Go(func() {
13901380
log.Dev.Errorf(ctx, "disk stall detected: %s\n%s", info, p.cfg.diskMonitor.LogTrace())
13911381
})
13921382
} else {
1393-
p.async(func() { log.Dev.Errorf(ctx, "disk stall detected: %s", info) })
1383+
p.asyncDone.Go(func() { log.Dev.Errorf(ctx, "disk stall detected: %s", info) })
13941384
}
13951385
}
13961386
return
@@ -3183,17 +3173,12 @@ func (cco *compactionConcurrencyOverride) Wrap(
31833173
// explicitly.
31843174
const diskUnhealthyResetInterval = 5 * time.Second
31853175

3186-
// asyncRunner abstracts Pebble.async for testing.
3187-
type asyncRunner interface {
3188-
async(fn func())
3189-
Closed() bool
3190-
}
3191-
31923176
type diskUnhealthyTracker struct {
3193-
st *cluster.Settings
3194-
asyncRunner asyncRunner
3195-
ts timeutil.TimeSource
3196-
mu struct {
3177+
st *cluster.Settings
3178+
isClosed func() bool
3179+
runAsync func(fn func())
3180+
ts timeutil.TimeSource
3181+
mu struct {
31973182
syncutil.Mutex
31983183
lastUnhealthyEventTime time.Time
31993184
currentlyUnhealthy bool
@@ -3219,7 +3204,7 @@ func (dut *diskUnhealthyTracker) onDiskSlow(info pebble.DiskSlowInfo) {
32193204
if !dut.mu.currentlyUnhealthy {
32203205
dut.mu.currentlyUnhealthy = true
32213206
dut.mu.lastUnhealthySampleTime = now
3222-
dut.asyncRunner.async(func() {
3207+
dut.runAsync(func() {
32233208
// Reset the unhealthy status after a while.
32243209
ticker := dut.ts.NewTicker(diskUnhealthyResetInterval)
32253210
defer ticker.Stop()
@@ -3232,7 +3217,7 @@ func (dut *diskUnhealthyTracker) onDiskSlow(info pebble.DiskSlowInfo) {
32323217
tickReceivedForTesting(now)
32333218
for {
32343219
now := <-ticker.Ch()
3235-
isClosed := dut.asyncRunner.Closed()
3220+
isClosed := dut.isClosed()
32363221
dut.mu.Lock()
32373222
if !dut.mu.currentlyUnhealthy {
32383223
panic(errors.AssertionFailedf("unexpected currentlyUnhealthy=false"))

pkg/storage/pebble_test.go

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1911,22 +1911,6 @@ func TestPebbleSpanPolicyFunc(t *testing.T) {
19111911
}
19121912
}
19131913

1914-
type testAsyncRunner struct {
1915-
b *strings.Builder
1916-
closed atomic.Bool
1917-
}
1918-
1919-
func (r *testAsyncRunner) async(fn func()) {
1920-
fmt.Fprintf(r.b, "asyncRunner.async\n")
1921-
go fn()
1922-
}
1923-
1924-
func (r *testAsyncRunner) Closed() bool {
1925-
closed := r.closed.Load()
1926-
fmt.Fprintf(r.b, "asyncRunner.Closed(): %t\n", closed)
1927-
return closed
1928-
}
1929-
19301914
func TestDiskUnhealthyTracker(t *testing.T) {
19311915
defer leaktest.AfterTest(t)()
19321916
defer log.Scope(t).Close(t)
@@ -1937,14 +1921,22 @@ func TestDiskUnhealthyTracker(t *testing.T) {
19371921
b.Reset()
19381922
return str
19391923
}
1940-
runner := &testAsyncRunner{b: &b}
1924+
var isClosed atomic.Bool
19411925
ts := timeutil.NewManualTime(time.Unix(0, 0))
19421926
st := cluster.MakeTestingClusterSettings()
1943-
UnhealthyWriteDuration.Override(context.Background(), &st.SV, 5*time.Second)
1927+
UnhealthyWriteDuration.Override(t.Context(), &st.SV, 5*time.Second)
19441928
tickReceivedCh := make(chan time.Time, 1)
19451929
tracker := &diskUnhealthyTracker{
1946-
st: st,
1947-
asyncRunner: runner,
1930+
st: st,
1931+
isClosed: func() bool {
1932+
closed := isClosed.Load()
1933+
fmt.Fprintf(&b, "asyncRunner.Closed(): %t\n", closed)
1934+
return closed
1935+
},
1936+
runAsync: func(fn func()) {
1937+
fmt.Fprintf(&b, "asyncRunner.async\n")
1938+
go fn()
1939+
},
19481940
ts: ts,
19491941
testingTickReceivedCh: tickReceivedCh,
19501942
}
@@ -1974,7 +1966,7 @@ func TestDiskUnhealthyTracker(t *testing.T) {
19741966
return builderStr()
19751967

19761968
case "close":
1977-
runner.closed.Store(true)
1969+
isClosed.Store(true)
19781970
return ""
19791971

19801972
default:

0 commit comments

Comments
 (0)