Skip to content

Commit 86a740f

Browse files
Alex Couture-Beilalexcb
authored andcommitted
use sampler lock instead
Signed-off-by: Alex Couture-Beil <[email protected]>
1 parent 4f0a1f8 commit 86a740f

File tree

1 file changed

+10
-13
lines changed

1 file changed

+10
-13
lines changed

executor/resources/sampler.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ type WithTimestamp interface {
1010
}
1111

1212
type Sampler[T WithTimestamp] struct {
13-
mu sync.RWMutex
13+
mu sync.Mutex
1414
minInterval time.Duration
1515
maxSamples int
1616
callback func(ts time.Time) (T, error)
@@ -26,19 +26,15 @@ type Sub[T WithTimestamp] struct {
2626
first time.Time
2727
last time.Time
2828
samples []T
29-
mu sync.RWMutex
3029
err error
3130
}
3231

3332
func (s *Sub[T]) Close(captureLast bool) ([]T, error) {
3433
s.sampler.mu.Lock()
3534
delete(s.sampler.subs, s)
36-
s.sampler.mu.Unlock()
37-
38-
s.mu.Lock()
39-
defer s.mu.Unlock()
4035

4136
if s.err != nil {
37+
s.sampler.mu.Unlock()
4238
return nil, s.err
4339
}
4440
current := s.first
@@ -50,6 +46,7 @@ func (s *Sub[T]) Close(captureLast bool) ([]T, error) {
5046
current = ts
5147
}
5248
}
49+
s.sampler.mu.Unlock()
5350

5451
if captureLast {
5552
v, err := s.sampler.callback(time.Now())
@@ -98,26 +95,26 @@ func (s *Sampler[T]) run() {
9895
return
9996
case <-ticker.C:
10097
tm := time.Now()
101-
s.mu.RLock()
98+
s.mu.Lock()
10299
active := make([]*Sub[T], 0, len(s.subs))
103100
for ss := range s.subs {
104-
ss.mu.Lock()
105101
if tm.Sub(ss.last) < ss.interval {
106-
ss.mu.Unlock()
107102
continue
108103
}
109104
ss.last = tm
110-
ss.mu.Unlock()
111105
active = append(active, ss)
112106
}
113-
s.mu.RUnlock()
107+
s.mu.Unlock()
114108
ticker = time.NewTimer(s.minInterval)
115109
if len(active) == 0 {
116110
continue
117111
}
118112
value, err := s.callback(tm)
113+
s.mu.Lock()
119114
for _, ss := range active {
120-
ss.mu.Lock()
115+
if _, found := s.subs[ss]; !found {
116+
continue // skip if Close() was called while the lock was released
117+
}
121118
if err != nil {
122119
ss.err = err
123120
} else {
@@ -128,8 +125,8 @@ func (s *Sampler[T]) run() {
128125
if time.Duration(ss.interval)*time.Duration(s.maxSamples) <= dur {
129126
ss.interval *= 2
130127
}
131-
ss.mu.Unlock()
132128
}
129+
s.mu.Unlock()
133130
}
134131
}
135132
}

0 commit comments

Comments
 (0)