Skip to content

Commit e6e5837

Browse files
committed
storage/disk: use context cancellation to stop monitoring
Address a pending TODO to use context cancellation, rather than an explicit stop channel, to stop monitoring of a disk. Epic: none Release note: none
1 parent 009a0e4 commit e6e5837

File tree

2 files changed

+21
-26
lines changed

2 files changed

+21
-26
lines changed

pkg/storage/disk/monitor.go

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ type MonitorManager struct {
4949

5050
mu struct {
5151
syncutil.Mutex
52-
stop chan struct{}
53-
disks []*monitoredDisk
52+
cancel context.CancelFunc
53+
disks []*monitoredDisk
5454
}
5555
}
5656

@@ -90,13 +90,14 @@ func (m *MonitorManager) Monitor(path string) (*Monitor, error) {
9090

9191
// The design maintains the invariant that the disk stat polling loop
9292
// is always running unless there are no disks being monitored.
93-
if m.mu.stop == nil {
93+
if m.mu.cancel == nil {
9494
collector, err := newStatsCollector(m.fs)
9595
if err != nil {
9696
return nil, err
9797
}
98-
m.mu.stop = make(chan struct{})
99-
go m.monitorDisks(collector, m.mu.stop)
98+
ctx, cancel := context.WithCancel(context.Background())
99+
m.mu.cancel = cancel
100+
go m.monitorDisks(ctx, collector)
100101
}
101102
}
102103
disk.refCount++
@@ -105,7 +106,7 @@ func (m *MonitorManager) Monitor(path string) (*Monitor, error) {
105106
}
106107

107108
func (m *MonitorManager) unrefDisk(disk *monitoredDisk) {
108-
var stop chan struct{}
109+
var cancel context.CancelFunc
109110
func() {
110111
m.mu.Lock()
111112
defer m.mu.Unlock()
@@ -123,14 +124,14 @@ func (m *MonitorManager) unrefDisk(disk *monitoredDisk) {
123124
// If the MonitorManager has no disks left to monitor, the disk stat polling loop can
124125
// be stopped.
125126
if len(m.mu.disks) == 0 {
126-
stop = m.mu.stop
127-
m.mu.stop = nil
127+
cancel = m.mu.cancel
128+
m.mu.cancel = nil
128129
}
129130
}
130131
}()
131132

132-
if stop != nil {
133-
stop <- struct{}{}
133+
if cancel != nil {
134+
cancel()
134135
}
135136
}
136137

@@ -139,22 +140,15 @@ type statsCollector interface {
139140
}
140141

141142
// monitorDisks runs a loop collecting disk stats for all monitored disks.
142-
//
143-
// NB: A stop channel must be passed down to ensure that the function terminates during the
144-
// race where the MonitorManager creates a new stop channel after unrefDisk sends a message
145-
// across the old stop channel.
146-
func (m *MonitorManager) monitorDisks(collector statsCollector, stop chan struct{}) {
147-
// TODO(jackson): Should we propagate a context here to replace the stop
148-
// channel?
149-
ctx := context.TODO()
143+
// monitorDisks returns when the context is done.
144+
func (m *MonitorManager) monitorDisks(ctx context.Context, collector statsCollector) {
150145
ticker := time.NewTicker(DefaultDiskStatsPollingInterval)
151146
defer ticker.Stop()
152147

153148
every := log.Every(5 * time.Minute)
154149
for {
155150
select {
156-
case <-stop:
157-
close(stop)
151+
case <-ctx.Done():
158152
return
159153
case <-ticker.C:
160154
m.mu.Lock()

pkg/storage/disk/monitor_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package disk
77

88
import (
9+
"context"
910
"sync/atomic"
1011
"testing"
1112
"time"
@@ -43,13 +44,13 @@ func TestMonitorManager_monitorDisks(t *testing.T) {
4344
manager.mu.disks = []*monitoredDisk{testDisk}
4445

4546
testCollector := &spyCollector{}
46-
stop := make(chan struct{})
47-
go manager.monitorDisks(testCollector, stop)
47+
ctx, cancel := context.WithCancel(context.Background())
48+
defer cancel()
49+
go manager.monitorDisks(ctx, testCollector)
4850

4951
require.Eventually(t, func() bool {
5052
return testCollector.collectCallCount.Load() > 0
5153
}, 100*DefaultDiskStatsPollingInterval, DefaultDiskStatsPollingInterval)
52-
stop <- struct{}{}
5354
}
5455

5556
func TestMonitor_StatsWindow(t *testing.T) {
@@ -136,8 +137,8 @@ func TestMonitor_Close(t *testing.T) {
136137
},
137138
refCount: 2,
138139
}
139-
stop := make(chan struct{})
140-
manager.mu.stop = stop
140+
ctx, cancel := context.WithCancel(context.Background())
141+
manager.mu.cancel = cancel
141142
manager.mu.disks = []*monitoredDisk{testDisk}
142143
monitor1 := Monitor{monitoredDisk: testDisk}
143144
monitor2 := Monitor{monitoredDisk: testDisk}
@@ -152,7 +153,7 @@ func TestMonitor_Close(t *testing.T) {
152153
go monitor2.Close()
153154
// If there are no monitors, stop the stat polling loop.
154155
select {
155-
case <-stop:
156+
case <-ctx.Done():
156157
case <-time.After(time.Second):
157158
t.Fatal("Failed to receive stop signal")
158159
}

0 commit comments

Comments
 (0)