Skip to content

Commit 15c9b15

Browse files
craig[bot]jbowensfqazi
committed
150867: storage/disk: deflake TestMonitorManager_monitorDisks r=RaduBerinde a=jbowens **storage/disk: deflake TestMonitorManager_monitorDisks** Use `require.Eventually` to deflake this test. Epic: none Fixes: #148556. Release note: none **storage/disk: use context cancellation to stop monitoring** Address a pending TODO to use context cancellation, rather than an explicit stop channel, to stop monitoring of a disk. Epic: none Release note: none 150963: roachtest: deflake activerecord r=fqazi a=fqazi Fixes: #150649 Fixes: #150570 Release note: None Co-authored-by: Jackson Owens <[email protected]> Co-authored-by: Faizan Qazi <[email protected]>
3 parents 5487d29 + e6e5837 + 4a3126a commit 15c9b15

File tree

3 files changed

+31
-30
lines changed

3 files changed

+31
-30
lines changed

pkg/cmd/roachtest/tests/activerecord_blocklist.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,17 @@ var activeRecordBlocklist = blocklist{
2424
var activeRecordIgnoreList = blocklist{
2525
`ActiveRecord::AdapterTestWithoutTransaction#test_create_with_query_cache`: "affected by autocommit_before_ddl",
2626
`ActiveRecord::CockroachDBStructureDumpTest#test_structure_dump`: "flaky",
27+
`ActiveRecord::ConnectionAdapters::ConnectionPoolThreadTest#test_checkout_fairness`: "flaky",
2728
`ActiveRecord::ConnectionAdapters::ConnectionPoolThreadTest#test_checkout_fairness_by_group`: "flaky",
2829
`ActiveRecord::ConnectionAdapters::PostgreSQLAdapterTest#test_translate_no_connection_exception_to_not_established`: "pg_terminate_backend not implemented",
2930
`ActiveRecord::Encryption::EncryptableRecordTest#test_by_default,_it's_case_sensitive`: "flaky",
3031
`ActiveRecord::Encryption::EncryptableRecordTest#test_forced_encoding_for_deterministic_attributes_will_replace_invalid_characters`: "flaky",
3132
`AssociationCallbacksTest#test_has_many_callbacks_for_destroy_on_parent`: "flaky",
3233
`BasicsTest#test_default_values_are_deeply_dupped`: "flaky",
3334
`CascadedEagerLoadingTest#test_eager_association_loading_with_cascaded_three_levels_by_ping_pong`: "flaky",
35+
`CockroachDB::AdapterForeignKeyTest#test_foreign_key_violations_are_translated_to_specific_exception_with_validate_false`: "flaky",
36+
`CockroachDB::AdapterForeignKeyTest#test_foreign_key_violations_on_delete_are_translated_to_specific_exception`: "flaky",
37+
`CockroachDB::AdapterForeignKeyTest#test_foreign_key_violations_on_insert_are_translated_to_specific_exception`: "flaky",
3438
`CockroachDB::FixturesTest#test_create_fixtures`: "flaky",
3539
`FixtureWithSetModelClassPrevailsOverNamingConventionTest#test_model_class_in_fixture_file_is_respected`: "flaky",
3640
`InheritanceTest#test_eager_load_belongs_to_primary_key_quoting`: "flaky",

pkg/storage/disk/monitor.go

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ type MonitorManager struct {
4949

5050
mu struct {
5151
syncutil.Mutex
52-
stop chan struct{}
53-
disks []*monitoredDisk
52+
cancel context.CancelFunc
53+
disks []*monitoredDisk
5454
}
5555
}
5656

@@ -90,13 +90,14 @@ func (m *MonitorManager) Monitor(path string) (*Monitor, error) {
9090

9191
// The design maintains the invariant that the disk stat polling loop
9292
// is always running unless there are no disks being monitored.
93-
if m.mu.stop == nil {
93+
if m.mu.cancel == nil {
9494
collector, err := newStatsCollector(m.fs)
9595
if err != nil {
9696
return nil, err
9797
}
98-
m.mu.stop = make(chan struct{})
99-
go m.monitorDisks(collector, m.mu.stop)
98+
ctx, cancel := context.WithCancel(context.Background())
99+
m.mu.cancel = cancel
100+
go m.monitorDisks(ctx, collector)
100101
}
101102
}
102103
disk.refCount++
@@ -105,7 +106,7 @@ func (m *MonitorManager) Monitor(path string) (*Monitor, error) {
105106
}
106107

107108
func (m *MonitorManager) unrefDisk(disk *monitoredDisk) {
108-
var stop chan struct{}
109+
var cancel context.CancelFunc
109110
func() {
110111
m.mu.Lock()
111112
defer m.mu.Unlock()
@@ -123,14 +124,14 @@ func (m *MonitorManager) unrefDisk(disk *monitoredDisk) {
123124
// If the MonitorManager has no disks left to monitor, the disk stat polling loop can
124125
// be stopped.
125126
if len(m.mu.disks) == 0 {
126-
stop = m.mu.stop
127-
m.mu.stop = nil
127+
cancel = m.mu.cancel
128+
m.mu.cancel = nil
128129
}
129130
}
130131
}()
131132

132-
if stop != nil {
133-
stop <- struct{}{}
133+
if cancel != nil {
134+
cancel()
134135
}
135136
}
136137

@@ -139,22 +140,15 @@ type statsCollector interface {
139140
}
140141

141142
// monitorDisks runs a loop collecting disk stats for all monitored disks.
142-
//
143-
// NB: A stop channel must be passed down to ensure that the function terminates during the
144-
// race where the MonitorManager creates a new stop channel after unrefDisk sends a message
145-
// across the old stop channel.
146-
func (m *MonitorManager) monitorDisks(collector statsCollector, stop chan struct{}) {
147-
// TODO(jackson): Should we propagate a context here to replace the stop
148-
// channel?
149-
ctx := context.TODO()
143+
// monitorDisks returns when the context is done.
144+
func (m *MonitorManager) monitorDisks(ctx context.Context, collector statsCollector) {
150145
ticker := time.NewTicker(DefaultDiskStatsPollingInterval)
151146
defer ticker.Stop()
152147

153148
every := log.Every(5 * time.Minute)
154149
for {
155150
select {
156-
case <-stop:
157-
close(stop)
151+
case <-ctx.Done():
158152
return
159153
case <-ticker.C:
160154
m.mu.Lock()

pkg/storage/disk/monitor_test.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
package disk
77

88
import (
9+
"context"
10+
"sync/atomic"
911
"testing"
1012
"time"
1113

@@ -17,13 +19,13 @@ import (
1719
)
1820

1921
type spyCollector struct {
20-
collectCallCount int
22+
collectCallCount atomic.Int32
2123
}
2224

2325
func (s *spyCollector) collect(
2426
disks []*monitoredDisk, now time.Time,
2527
) (countCollected int, err error) {
26-
s.collectCallCount++
28+
s.collectCallCount.Add(1)
2729
return len(disks), nil
2830
}
2931

@@ -42,12 +44,13 @@ func TestMonitorManager_monitorDisks(t *testing.T) {
4244
manager.mu.disks = []*monitoredDisk{testDisk}
4345

4446
testCollector := &spyCollector{}
45-
stop := make(chan struct{})
46-
go manager.monitorDisks(testCollector, stop)
47+
ctx, cancel := context.WithCancel(context.Background())
48+
defer cancel()
49+
go manager.monitorDisks(ctx, testCollector)
4750

48-
time.Sleep(2 * DefaultDiskStatsPollingInterval)
49-
stop <- struct{}{}
50-
require.Greater(t, testCollector.collectCallCount, 0)
51+
require.Eventually(t, func() bool {
52+
return testCollector.collectCallCount.Load() > 0
53+
}, 100*DefaultDiskStatsPollingInterval, DefaultDiskStatsPollingInterval)
5154
}
5255

5356
func TestMonitor_StatsWindow(t *testing.T) {
@@ -134,8 +137,8 @@ func TestMonitor_Close(t *testing.T) {
134137
},
135138
refCount: 2,
136139
}
137-
stop := make(chan struct{})
138-
manager.mu.stop = stop
140+
ctx, cancel := context.WithCancel(context.Background())
141+
manager.mu.cancel = cancel
139142
manager.mu.disks = []*monitoredDisk{testDisk}
140143
monitor1 := Monitor{monitoredDisk: testDisk}
141144
monitor2 := Monitor{monitoredDisk: testDisk}
@@ -150,7 +153,7 @@ func TestMonitor_Close(t *testing.T) {
150153
go monitor2.Close()
151154
// If there are no monitors, stop the stat polling loop.
152155
select {
153-
case <-stop:
156+
case <-ctx.Done():
154157
case <-time.After(time.Second):
155158
t.Fatal("Failed to receive stop signal")
156159
}

0 commit comments

Comments
 (0)