Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions pkg/settings/limits/bound.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ func newBoundLimiter[N Number](f Factory, bound settings.SettingSpec[N]) (BoundL
}

if bound.GetScope() == settings.ScopeGlobal {
b.updateCRE(contexts.CRE{})
go b.updateLoop(contexts.CRE{})
go b.updateLoop(context.Background())
}

return b, nil
Expand Down Expand Up @@ -200,13 +199,12 @@ func (b *boundLimiter[N]) get(ctx context.Context) (tenant string, bound N, err

u := newUpdater(b.lggr, b.getLimitFn, b.subFn)
actual, loaded := b.updaters.LoadOrStore(tenant, u)
cre := b.scope.RoundCRE(contexts.CREValue(ctx))
creCtx := contexts.WithCRE(ctx, b.scope.RoundCRE(contexts.CREValue(ctx)))
if !loaded {
u.cre.Store(cre)
go u.updateLoop(cre)
go u.updateLoop(creCtx)
} else {
u = actual.(*updater[N])
u.updateCRE(cre)
u.updateCtx(creCtx)
}
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/settings/limits/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func newGateLimiter(f Factory, limit settings.SettingSpec[bool]) (GateLimiter, e

// OPT: restore with support for SettingMap
//if limit.Default.Scope == settings.ScopeGlobal {
// g.updateCRE(contexts.CRE{})
// g.updateCtx(contexts.CRE{})
// go g.updateLoop(contexts.CRE{})
//}
close(g.done)
Expand Down Expand Up @@ -190,15 +190,15 @@ func (g *gateLimiter) get(ctx context.Context) (tenant string, open bool, err er

u := newUpdater(g.lggr, g.getLimitFn, g.subFn)
actual, loaded := g.updaters.LoadOrStore(tenant, u)
cre := g.scope.RoundCRE(contexts.CREValue(ctx))
creCtx := contexts.WithCRE(ctx, g.scope.RoundCRE(contexts.CREValue(ctx)))
if !loaded {
// OPT: restore with support for SettingMap
//u.cre.Store(cre)
//go u.updateLoop(cre)
close(u.done)
} else {
u = actual.(*updater[bool])
u.updateCRE(cre)
u.updateCtx(creCtx)
}
}

Expand Down
10 changes: 4 additions & 6 deletions pkg/settings/limits/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,7 @@ func newUnscopedQueue[T any](f Factory, limit settings.Setting[int]) (QueueLimit
}
}

q.cre.Store(contexts.CRE{})
go q.updateLoop(contexts.CRE{})
go q.updateLoop(context.Background())

return unscopedQueue[T]{q}, nil
}
Expand Down Expand Up @@ -336,13 +335,12 @@ func (s *scopedQueue[T]) getOrCreate(ctx context.Context) (*queue[T], func(), er

q := s.newQueue(tenant)
actual, loaded := s.queues.LoadOrStore(tenant, q)
cre := s.scope.RoundCRE(contexts.CREValue(ctx))
creCtx := contexts.WithCRE(ctx, s.scope.RoundCRE(contexts.CREValue(ctx)))
if !loaded {
q.cre.Store(cre)
go q.updateLoop(cre)
go q.updateLoop(creCtx)
} else {
q = actual.(*queue[T])
q.updateCRE(cre)
q.updateCtx(creCtx)
}
return q, s.wg.Done, nil
}
Expand Down
10 changes: 4 additions & 6 deletions pkg/settings/limits/rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,7 @@ func (f Factory) globalRateLimiter(limit settings.Setting[config.Rate]) (RateLim
l.recordLimit(ctx, float64(r.Limit))
l.recordBurst(ctx, int64(r.Burst))
}
l.cre.Store(contexts.CRE{})
go l.updateLoop(contexts.CRE{})
go l.updateLoop(context.Background())

return l, nil
}
Expand Down Expand Up @@ -421,13 +420,12 @@ func (s *scopedRateLimiter) getOrCreate(ctx context.Context) (RateLimiter, func(

limiter := s.newRateLimiter(tenant)
actual, loaded := s.limiters.LoadOrStore(tenant, limiter)
cre := s.scope.RoundCRE(contexts.CREValue(ctx))
creCtx := contexts.WithCRE(ctx, s.scope.RoundCRE(contexts.CREValue(ctx)))
if !loaded {
limiter.cre.Store(cre)
go limiter.updateLoop(cre)
go limiter.updateLoop(creCtx)
} else {
limiter = actual.(*rateLimiter)
limiter.updateCRE(cre)
limiter.updateCtx(creCtx)
}
return limiter, s.wg.Done, nil
}
Expand Down
10 changes: 4 additions & 6 deletions pkg/settings/limits/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ func newGlobalResourcePoolLimiter[N Number](f Factory, limit settings.Setting[N]
}
}

l.cre.Store(contexts.CRE{})
go l.updateLoop(contexts.CRE{})
go l.updateLoop(context.Background())

return l, nil
}
Expand Down Expand Up @@ -656,13 +655,12 @@ func (s *scopedResourcePoolLimiter[N]) getOrCreate(ctx context.Context) (resourc

usage := s.newLimitUsage(tenant)
actual, loaded := s.used.LoadOrStore(tenant, usage)
cre := s.scope.RoundCRE(contexts.CREValue(ctx))
creCtx := contexts.WithCRE(ctx, s.scope.RoundCRE(contexts.CREValue(ctx)))
if !loaded {
usage.cre.Store(cre)
go usage.updateLoop(cre)
go usage.updateLoop(creCtx)
} else {
usage = actual.(*resourcePoolUsage[N])
usage.updateCRE(cre)
usage.updateCtx(creCtx)
}

return usage, s.wg.Done, nil
Expand Down
5 changes: 2 additions & 3 deletions pkg/settings/limits/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"errors"
"fmt"
"log"
"sync/atomic"
"testing"
"time"

"sync/atomic"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -467,7 +466,7 @@ func TestResourcePoolLimiter_LimitFlapToZeroDoesNotDeadlock(t *testing.T) {
limiter.getLimitFn = func(context.Context) (int, error) {
return int(limit.Load()), nil
}
go limiter.updateLoop(contexts.CRE{})
go limiter.updateLoop(t.Context())
t.Cleanup(func() { assert.NoError(t, limiter.Close()) })

ctx := t.Context()
Expand Down
10 changes: 4 additions & 6 deletions pkg/settings/limits/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ func (f Factory) newTimeLimiter(timeout settings.Setting[time.Duration]) (TimeLi
}

if timeout.Scope == settings.ScopeGlobal {
l.updateCRE(contexts.CRE{})
go l.updateLoop(contexts.CRE{})
go l.updateLoop(context.Background())
}

return l, nil
Expand Down Expand Up @@ -223,13 +222,12 @@ func (l *timeLimiter) get(ctx context.Context) (tenant string, timeout time.Dura

u := newUpdater(l.lggr, l.getLimitFn, l.subFn)
actual, loaded := l.updaters.LoadOrStore(tenant, u)
cre := l.scope.RoundCRE(contexts.CREValue(ctx))
creCtx := contexts.WithCRE(ctx, l.scope.RoundCRE(contexts.CREValue(ctx)))
if !loaded {
u.cre.Store(cre)
go u.updateLoop(cre)
go u.updateLoop(creCtx)
} else {
u = actual.(*updater[time.Duration])
u.updateCRE(cre)
u.updateCtx(creCtx)
}
}

Expand Down
40 changes: 16 additions & 24 deletions pkg/settings/limits/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package limits
import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/contexts"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/settings"
Expand All @@ -21,8 +19,7 @@ type updater[N any] struct {
recordLimit func(context.Context, N)
onLimitUpdate func(context.Context)

creCh chan struct{}
cre atomic.Value
ctxCh chan context.Context // to receive updates

stopOnce sync.Once
stopCh services.StopChan
Expand All @@ -40,7 +37,7 @@ func newUpdater[N any](lggr logger.Logger, getLimitFn func(context.Context) (N,
getLimitFn: getLimitFn,
subFn: subFn,
recordLimit: func(ctx context.Context, n N) {}, // no-op
creCh: make(chan struct{}, 1),
ctxCh: make(chan context.Context, 1),
stopCh: make(chan struct{}),
done: make(chan struct{}),
}
Expand All @@ -59,30 +56,26 @@ func (u *updater[N]) Close() error {

}

func (u *updater[N]) updateCRE(cre contexts.CRE) {
if v := u.cre.Load(); v != nil && v.(contexts.CRE) == cre {
return
}
u.cre.Store(cre)
func (u *updater[N]) updateCtx(ctx context.Context) {
select {
case u.creCh <- struct{}{}:
case u.ctxCh <- ctx:
default:
}
}

// updateLoop updates the limit either by subscribing via subFn or polling if subFn is not set. It also processes
// contexts.CRE updates. Stopped by Close.
// opt: reap after period of non-use
func (u *updater[N]) updateLoop(cre contexts.CRE) {
func (u *updater[N]) updateLoop(ctx context.Context) {
defer close(u.done)
ctx, cancel := u.stopCh.NewCtx()
ctx, cancel := u.stopCh.Ctx(context.WithoutCancel(ctx))
defer cancel()

var updates <-chan settings.Update[N]
var cancelSub func()
var c <-chan time.Time
if u.subFn != nil {
updates, cancelSub = u.subFn(contexts.WithCRE(ctx, cre))
updates, cancelSub = u.subFn(ctx)
defer func() { cancelSub() }() // extra func wrapper is required to ensure we get the final cancelSub value
// opt: poll now to initialize
} else {
Expand All @@ -96,31 +89,30 @@ func (u *updater[N]) updateLoop(cre contexts.CRE) {
return

case <-c:
limit, err := u.getLimitFn(contexts.WithCRE(ctx, cre))
limit, err := u.getLimitFn(ctx)
if err != nil {
u.lggr.Errorw("Failed to get limit. Using default value", "default", limit, "err", err)
}
rcCtx := contexts.WithCRE(ctx, cre)
u.recordLimit(rcCtx, limit)
u.recordLimit(ctx, limit)
if u.onLimitUpdate != nil {
u.onLimitUpdate(rcCtx)
u.onLimitUpdate(ctx)
}

case update := <-updates:
if update.Err != nil {
u.lggr.Errorw("Failed to update limit. Using default value", "default", update.Value, "err", update.Err)
}
rcCtx := contexts.WithCRE(ctx, cre)
u.recordLimit(rcCtx, update.Value)
u.recordLimit(ctx, update.Value)
if u.onLimitUpdate != nil {
u.onLimitUpdate(rcCtx)
u.onLimitUpdate(ctx)
}

case <-u.creCh:
cre = u.cre.Load().(contexts.CRE)
case newCtx := <-u.ctxCh:
cancel()
ctx, cancel = u.stopCh.Ctx(newCtx)
if u.subFn != nil {
cancelSub()
updates, cancelSub = u.subFn(contexts.WithCRE(ctx, cre))
updates, cancelSub = u.subFn(ctx)
}
// opt: update now
}
Expand Down
32 changes: 23 additions & 9 deletions pkg/settings/limits/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func Test_updater(t *testing.T) {
u := newUpdater[int](logger.Test(t), func(ctx context.Context) (int, error) { return 13, nil }, nil)
u.recordLimit = func(ctx context.Context, i int) { got = append(got, i) }

go u.updateLoop(tt.cre)
go u.updateLoop(contexts.WithCRE(t.Context(), tt.cre))
time.Sleep(2 * pollPeriod)
require.NoError(t, u.Close())

Expand All @@ -48,19 +48,26 @@ func Test_updater(t *testing.T) {
u := newUpdater[int](logger.Test(t), func(ctx context.Context) (int, error) { return int(limit.Load()), nil }, nil)
u.recordLimit = func(ctx context.Context, i int) { got = append(got, i) }

go u.updateLoop(tt.cre)
go u.updateLoop(contexts.WithCRE(t.Context(), tt.cre))
time.Sleep(2 * pollPeriod)
limit.Store(42)
cre2 := contexts.CRE{Org: "org-id"}
u.updateCRE(cre2)
ctx2, cancel := context.WithCancel(contexts.WithCRE(t.Context(), contexts.CRE{Org: "org-id"}))
t.Cleanup(cancel)
u.updateCtx(ctx2)
time.Sleep(2 * pollPeriod)
require.NoError(t, u.Close())

assert.GreaterOrEqual(t, len(got), 2)
assert.Equal(t, got[0], 13)
assert.Equal(t, got[len(got)-1], 42)

assert.Equal(t, cre2, u.cre.Load())
cancel()
select {
case <-time.After(5 * time.Second):
t.Error("timed out waiting for updater to close")
case <-u.done:
// success
}
})
t.Run("sub", func(t *testing.T) {
t.Parallel()
Expand All @@ -70,18 +77,25 @@ func Test_updater(t *testing.T) {
func(ctx context.Context) (<-chan settings.Update[int], func()) { return updates, func() {} })
u.recordLimit = func(ctx context.Context, i int) { got = append(got, i) }

go u.updateLoop(tt.cre)
go u.updateLoop(contexts.WithCRE(t.Context(), tt.cre))
updates <- settings.Update[int]{Value: 42}
updates <- settings.Update[int]{Value: 100}
cre2 := contexts.CRE{Org: "org-id"}
u.updateCRE(cre2)
ctx2, cancel := context.WithCancel(contexts.WithCRE(t.Context(), contexts.CRE{Org: "org-id"}))
t.Cleanup(cancel)
u.updateCtx(ctx2)
require.NoError(t, u.Close())

assert.GreaterOrEqual(t, len(got), 2)
assert.Equal(t, got[0], 42)
assert.Equal(t, got[len(got)-1], 100)

assert.Equal(t, cre2, u.cre.Load())
cancel()
select {
case <-time.After(5 * time.Second):
t.Error("timed out waiting for updater to close")
case <-u.done:
// success
}
})
})
}
Expand Down
Loading