Skip to content

Commit 4f0a1f8

Browse files
Alex Couture-Beilalexcb
authored andcommitted
sampler data race fix
Protect against data race occuring while both sampler.Run and sub.Close accesses the same data from multiple threads. Signed-off-by: Alex Couture-Beil <[email protected]>
1 parent a7789ee commit 4f0a1f8

File tree

1 file changed

+10
-1
lines changed

1 file changed

+10
-1
lines changed

executor/resources/sampler.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type Sub[T WithTimestamp] struct {
2626
first time.Time
2727
last time.Time
2828
samples []T
29+
mu sync.RWMutex
2930
err error
3031
}
3132

@@ -34,6 +35,9 @@ func (s *Sub[T]) Close(captureLast bool) ([]T, error) {
3435
delete(s.sampler.subs, s)
3536
s.sampler.mu.Unlock()
3637

38+
s.mu.Lock()
39+
defer s.mu.Unlock()
40+
3741
if s.err != nil {
3842
return nil, s.err
3943
}
@@ -94,13 +98,16 @@ func (s *Sampler[T]) run() {
9498
return
9599
case <-ticker.C:
96100
tm := time.Now()
97-
active := make([]*Sub[T], 0, len(s.subs))
98101
s.mu.RLock()
102+
active := make([]*Sub[T], 0, len(s.subs))
99103
for ss := range s.subs {
104+
ss.mu.Lock()
100105
if tm.Sub(ss.last) < ss.interval {
106+
ss.mu.Unlock()
101107
continue
102108
}
103109
ss.last = tm
110+
ss.mu.Unlock()
104111
active = append(active, ss)
105112
}
106113
s.mu.RUnlock()
@@ -110,6 +117,7 @@ func (s *Sampler[T]) run() {
110117
}
111118
value, err := s.callback(tm)
112119
for _, ss := range active {
120+
ss.mu.Lock()
113121
if err != nil {
114122
ss.err = err
115123
} else {
@@ -120,6 +128,7 @@ func (s *Sampler[T]) run() {
120128
if time.Duration(ss.interval)*time.Duration(s.maxSamples) <= dur {
121129
ss.interval *= 2
122130
}
131+
ss.mu.Unlock()
123132
}
124133
}
125134
}

0 commit comments

Comments
 (0)