Skip to content

Commit 8f90cd5

Browse files
authored
pkg/settings/limits: pass context to updater instead of CRE values (#1809)
1 parent b7528e7 commit 8f90cd5

File tree

9 files changed

+64
-69
lines changed

9 files changed

+64
-69
lines changed

pkg/settings/limits/bound.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,7 @@ func newBoundLimiter[N Number](f Factory, bound settings.SettingSpec[N]) (BoundL
106106
}
107107

108108
if bound.GetScope() == settings.ScopeGlobal {
109-
b.updateCRE(contexts.CRE{})
110-
go b.updateLoop(contexts.CRE{})
109+
go b.updateLoop(context.Background())
111110
}
112111

113112
return b, nil
@@ -200,13 +199,12 @@ func (b *boundLimiter[N]) get(ctx context.Context) (tenant string, bound N, err
200199

201200
u := newUpdater(b.lggr, b.getLimitFn, b.subFn)
202201
actual, loaded := b.updaters.LoadOrStore(tenant, u)
203-
cre := b.scope.RoundCRE(contexts.CREValue(ctx))
202+
creCtx := contexts.WithCRE(ctx, b.scope.RoundCRE(contexts.CREValue(ctx)))
204203
if !loaded {
205-
u.cre.Store(cre)
206-
go u.updateLoop(cre)
204+
go u.updateLoop(creCtx)
207205
} else {
208206
u = actual.(*updater[N])
209-
u.updateCRE(cre)
207+
u.updateCtx(creCtx)
210208
}
211209
}
212210

pkg/settings/limits/gate.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func newGateLimiter(f Factory, limit settings.SettingSpec[bool]) (GateLimiter, e
105105

106106
// OPT: restore with support for SettingMap
107107
//if limit.Default.Scope == settings.ScopeGlobal {
108-
// g.updateCRE(contexts.CRE{})
108+
// g.updateCtx(contexts.CRE{})
109109
// go g.updateLoop(contexts.CRE{})
110110
//}
111111
close(g.done)
@@ -190,15 +190,15 @@ func (g *gateLimiter) get(ctx context.Context) (tenant string, open bool, err er
190190

191191
u := newUpdater(g.lggr, g.getLimitFn, g.subFn)
192192
actual, loaded := g.updaters.LoadOrStore(tenant, u)
193-
cre := g.scope.RoundCRE(contexts.CREValue(ctx))
193+
creCtx := contexts.WithCRE(ctx, g.scope.RoundCRE(contexts.CREValue(ctx)))
194194
if !loaded {
195195
// OPT: restore with support for SettingMap
196196
//u.cre.Store(cre)
197197
//go u.updateLoop(cre)
198198
close(u.done)
199199
} else {
200200
u = actual.(*updater[bool])
201-
u.updateCRE(cre)
201+
u.updateCtx(creCtx)
202202
}
203203
}
204204

pkg/settings/limits/queue.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,7 @@ func newUnscopedQueue[T any](f Factory, limit settings.Setting[int]) (QueueLimit
193193
}
194194
}
195195

196-
q.cre.Store(contexts.CRE{})
197-
go q.updateLoop(contexts.CRE{})
196+
go q.updateLoop(context.Background())
198197

199198
return unscopedQueue[T]{q}, nil
200199
}
@@ -336,13 +335,12 @@ func (s *scopedQueue[T]) getOrCreate(ctx context.Context) (*queue[T], func(), er
336335

337336
q := s.newQueue(tenant)
338337
actual, loaded := s.queues.LoadOrStore(tenant, q)
339-
cre := s.scope.RoundCRE(contexts.CREValue(ctx))
338+
creCtx := contexts.WithCRE(ctx, s.scope.RoundCRE(contexts.CREValue(ctx)))
340339
if !loaded {
341-
q.cre.Store(cre)
342-
go q.updateLoop(cre)
340+
go q.updateLoop(creCtx)
343341
} else {
344342
q = actual.(*queue[T])
345-
q.updateCRE(cre)
343+
q.updateCtx(creCtx)
346344
}
347345
return q, s.wg.Done, nil
348346
}

pkg/settings/limits/rate.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,7 @@ func (f Factory) globalRateLimiter(limit settings.Setting[config.Rate]) (RateLim
202202
l.recordLimit(ctx, float64(r.Limit))
203203
l.recordBurst(ctx, int64(r.Burst))
204204
}
205-
l.cre.Store(contexts.CRE{})
206-
go l.updateLoop(contexts.CRE{})
205+
go l.updateLoop(context.Background())
207206

208207
return l, nil
209208
}
@@ -421,13 +420,12 @@ func (s *scopedRateLimiter) getOrCreate(ctx context.Context) (RateLimiter, func(
421420

422421
limiter := s.newRateLimiter(tenant)
423422
actual, loaded := s.limiters.LoadOrStore(tenant, limiter)
424-
cre := s.scope.RoundCRE(contexts.CREValue(ctx))
423+
creCtx := contexts.WithCRE(ctx, s.scope.RoundCRE(contexts.CREValue(ctx)))
425424
if !loaded {
426-
limiter.cre.Store(cre)
427-
go limiter.updateLoop(cre)
425+
go limiter.updateLoop(creCtx)
428426
} else {
429427
limiter = actual.(*rateLimiter)
430-
limiter.updateCRE(cre)
428+
limiter.updateCtx(creCtx)
431429
}
432430
return limiter, s.wg.Done, nil
433431
}

pkg/settings/limits/resource.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,7 @@ func newGlobalResourcePoolLimiter[N Number](f Factory, limit settings.Setting[N]
7272
}
7373
}
7474

75-
l.cre.Store(contexts.CRE{})
76-
go l.updateLoop(contexts.CRE{})
75+
go l.updateLoop(context.Background())
7776

7877
return l, nil
7978
}
@@ -656,13 +655,12 @@ func (s *scopedResourcePoolLimiter[N]) getOrCreate(ctx context.Context) (resourc
656655

657656
usage := s.newLimitUsage(tenant)
658657
actual, loaded := s.used.LoadOrStore(tenant, usage)
659-
cre := s.scope.RoundCRE(contexts.CREValue(ctx))
658+
creCtx := contexts.WithCRE(ctx, s.scope.RoundCRE(contexts.CREValue(ctx)))
660659
if !loaded {
661-
usage.cre.Store(cre)
662-
go usage.updateLoop(cre)
660+
go usage.updateLoop(creCtx)
663661
} else {
664662
usage = actual.(*resourcePoolUsage[N])
665-
usage.updateCRE(cre)
663+
usage.updateCtx(creCtx)
666664
}
667665

668666
return usage, s.wg.Done, nil

pkg/settings/limits/resource_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,10 @@ import (
55
"errors"
66
"fmt"
77
"log"
8+
"sync/atomic"
89
"testing"
910
"time"
1011

11-
"sync/atomic"
12-
1312
"github.com/stretchr/testify/assert"
1413
"github.com/stretchr/testify/require"
1514
"go.opentelemetry.io/otel/attribute"
@@ -467,7 +466,7 @@ func TestResourcePoolLimiter_LimitFlapToZeroDoesNotDeadlock(t *testing.T) {
467466
limiter.getLimitFn = func(context.Context) (int, error) {
468467
return int(limit.Load()), nil
469468
}
470-
go limiter.updateLoop(contexts.CRE{})
469+
go limiter.updateLoop(t.Context())
471470
t.Cleanup(func() { assert.NoError(t, limiter.Close()) })
472471

473472
ctx := t.Context()

pkg/settings/limits/time.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,7 @@ func (f Factory) newTimeLimiter(timeout settings.Setting[time.Duration]) (TimeLi
9393
}
9494

9595
if timeout.Scope == settings.ScopeGlobal {
96-
l.updateCRE(contexts.CRE{})
97-
go l.updateLoop(contexts.CRE{})
96+
go l.updateLoop(context.Background())
9897
}
9998

10099
return l, nil
@@ -223,13 +222,12 @@ func (l *timeLimiter) get(ctx context.Context) (tenant string, timeout time.Dura
223222

224223
u := newUpdater(l.lggr, l.getLimitFn, l.subFn)
225224
actual, loaded := l.updaters.LoadOrStore(tenant, u)
226-
cre := l.scope.RoundCRE(contexts.CREValue(ctx))
225+
creCtx := contexts.WithCRE(ctx, l.scope.RoundCRE(contexts.CREValue(ctx)))
227226
if !loaded {
228-
u.cre.Store(cre)
229-
go u.updateLoop(cre)
227+
go u.updateLoop(creCtx)
230228
} else {
231229
u = actual.(*updater[time.Duration])
232-
u.updateCRE(cre)
230+
u.updateCtx(creCtx)
233231
}
234232
}
235233

pkg/settings/limits/updater.go

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@ package limits
33
import (
44
"context"
55
"sync"
6-
"sync/atomic"
76
"time"
87

9-
"github.com/smartcontractkit/chainlink-common/pkg/contexts"
108
"github.com/smartcontractkit/chainlink-common/pkg/logger"
119
"github.com/smartcontractkit/chainlink-common/pkg/services"
1210
"github.com/smartcontractkit/chainlink-common/pkg/settings"
@@ -21,8 +19,7 @@ type updater[N any] struct {
2119
recordLimit func(context.Context, N)
2220
onLimitUpdate func(context.Context)
2321

24-
creCh chan struct{}
25-
cre atomic.Value
22+
ctxCh chan context.Context // to receive updates
2623

2724
stopOnce sync.Once
2825
stopCh services.StopChan
@@ -40,7 +37,7 @@ func newUpdater[N any](lggr logger.Logger, getLimitFn func(context.Context) (N,
4037
getLimitFn: getLimitFn,
4138
subFn: subFn,
4239
recordLimit: func(ctx context.Context, n N) {}, // no-op
43-
creCh: make(chan struct{}, 1),
40+
ctxCh: make(chan context.Context, 1),
4441
stopCh: make(chan struct{}),
4542
done: make(chan struct{}),
4643
}
@@ -59,30 +56,26 @@ func (u *updater[N]) Close() error {
5956

6057
}
6158

62-
func (u *updater[N]) updateCRE(cre contexts.CRE) {
63-
if v := u.cre.Load(); v != nil && v.(contexts.CRE) == cre {
64-
return
65-
}
66-
u.cre.Store(cre)
59+
func (u *updater[N]) updateCtx(ctx context.Context) {
6760
select {
68-
case u.creCh <- struct{}{}:
61+
case u.ctxCh <- ctx:
6962
default:
7063
}
7164
}
7265

7366
// updateLoop updates the limit either by subscribing via subFn or polling if subFn is not set. It also processes
7467
// contexts.CRE updates. Stopped by Close.
7568
// opt: reap after period of non-use
76-
func (u *updater[N]) updateLoop(cre contexts.CRE) {
69+
func (u *updater[N]) updateLoop(ctx context.Context) {
7770
defer close(u.done)
78-
ctx, cancel := u.stopCh.NewCtx()
71+
ctx, cancel := u.stopCh.Ctx(context.WithoutCancel(ctx))
7972
defer cancel()
8073

8174
var updates <-chan settings.Update[N]
8275
var cancelSub func()
8376
var c <-chan time.Time
8477
if u.subFn != nil {
85-
updates, cancelSub = u.subFn(contexts.WithCRE(ctx, cre))
78+
updates, cancelSub = u.subFn(ctx)
8679
defer func() { cancelSub() }() // extra func wrapper is required to ensure we get the final cancelSub value
8780
// opt: poll now to initialize
8881
} else {
@@ -96,31 +89,30 @@ func (u *updater[N]) updateLoop(cre contexts.CRE) {
9689
return
9790

9891
case <-c:
99-
limit, err := u.getLimitFn(contexts.WithCRE(ctx, cre))
92+
limit, err := u.getLimitFn(ctx)
10093
if err != nil {
10194
u.lggr.Errorw("Failed to get limit. Using default value", "default", limit, "err", err)
10295
}
103-
rcCtx := contexts.WithCRE(ctx, cre)
104-
u.recordLimit(rcCtx, limit)
96+
u.recordLimit(ctx, limit)
10597
if u.onLimitUpdate != nil {
106-
u.onLimitUpdate(rcCtx)
98+
u.onLimitUpdate(ctx)
10799
}
108100

109101
case update := <-updates:
110102
if update.Err != nil {
111103
u.lggr.Errorw("Failed to update limit. Using default value", "default", update.Value, "err", update.Err)
112104
}
113-
rcCtx := contexts.WithCRE(ctx, cre)
114-
u.recordLimit(rcCtx, update.Value)
105+
u.recordLimit(ctx, update.Value)
115106
if u.onLimitUpdate != nil {
116-
u.onLimitUpdate(rcCtx)
107+
u.onLimitUpdate(ctx)
117108
}
118109

119-
case <-u.creCh:
120-
cre = u.cre.Load().(contexts.CRE)
110+
case newCtx := <-u.ctxCh:
111+
cancel()
112+
ctx, cancel = u.stopCh.Ctx(newCtx)
121113
if u.subFn != nil {
122114
cancelSub()
123-
updates, cancelSub = u.subFn(contexts.WithCRE(ctx, cre))
115+
updates, cancelSub = u.subFn(ctx)
124116
}
125117
// opt: update now
126118
}

pkg/settings/limits/updater_test.go

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func Test_updater(t *testing.T) {
3131
u := newUpdater[int](logger.Test(t), func(ctx context.Context) (int, error) { return 13, nil }, nil)
3232
u.recordLimit = func(ctx context.Context, i int) { got = append(got, i) }
3333

34-
go u.updateLoop(tt.cre)
34+
go u.updateLoop(contexts.WithCRE(t.Context(), tt.cre))
3535
time.Sleep(2 * pollPeriod)
3636
require.NoError(t, u.Close())
3737

@@ -48,19 +48,26 @@ func Test_updater(t *testing.T) {
4848
u := newUpdater[int](logger.Test(t), func(ctx context.Context) (int, error) { return int(limit.Load()), nil }, nil)
4949
u.recordLimit = func(ctx context.Context, i int) { got = append(got, i) }
5050

51-
go u.updateLoop(tt.cre)
51+
go u.updateLoop(contexts.WithCRE(t.Context(), tt.cre))
5252
time.Sleep(2 * pollPeriod)
5353
limit.Store(42)
54-
cre2 := contexts.CRE{Org: "org-id"}
55-
u.updateCRE(cre2)
54+
ctx2, cancel := context.WithCancel(contexts.WithCRE(t.Context(), contexts.CRE{Org: "org-id"}))
55+
t.Cleanup(cancel)
56+
u.updateCtx(ctx2)
5657
time.Sleep(2 * pollPeriod)
5758
require.NoError(t, u.Close())
5859

5960
assert.GreaterOrEqual(t, len(got), 2)
6061
assert.Equal(t, got[0], 13)
6162
assert.Equal(t, got[len(got)-1], 42)
6263

63-
assert.Equal(t, cre2, u.cre.Load())
64+
cancel()
65+
select {
66+
case <-time.After(5 * time.Second):
67+
t.Error("timed out waiting for updater to close")
68+
case <-u.done:
69+
// success
70+
}
6471
})
6572
t.Run("sub", func(t *testing.T) {
6673
t.Parallel()
@@ -70,18 +77,25 @@ func Test_updater(t *testing.T) {
7077
func(ctx context.Context) (<-chan settings.Update[int], func()) { return updates, func() {} })
7178
u.recordLimit = func(ctx context.Context, i int) { got = append(got, i) }
7279

73-
go u.updateLoop(tt.cre)
80+
go u.updateLoop(contexts.WithCRE(t.Context(), tt.cre))
7481
updates <- settings.Update[int]{Value: 42}
7582
updates <- settings.Update[int]{Value: 100}
76-
cre2 := contexts.CRE{Org: "org-id"}
77-
u.updateCRE(cre2)
83+
ctx2, cancel := context.WithCancel(contexts.WithCRE(t.Context(), contexts.CRE{Org: "org-id"}))
84+
t.Cleanup(cancel)
85+
u.updateCtx(ctx2)
7886
require.NoError(t, u.Close())
7987

8088
assert.GreaterOrEqual(t, len(got), 2)
8189
assert.Equal(t, got[0], 42)
8290
assert.Equal(t, got[len(got)-1], 100)
8391

84-
assert.Equal(t, cre2, u.cre.Load())
92+
cancel()
93+
select {
94+
case <-time.After(5 * time.Second):
95+
t.Error("timed out waiting for updater to close")
96+
case <-u.done:
97+
// success
98+
}
8599
})
86100
})
87101
}

0 commit comments

Comments
 (0)