Skip to content

Commit a205890

Browse files
authored
fix: handle jitter correctly (#2278)
1 parent e7c05cb commit a205890

File tree

4 files changed

+168
-47
lines changed

4 files changed

+168
-47
lines changed

router/core/router.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1399,12 +1399,6 @@ func (r *Router) Shutdown(ctx context.Context) error {
13991399
ctx = ctxWithTimer
14001400
}
14011401

1402-
if r.configPoller != nil {
1403-
if subErr := r.configPoller.Stop(ctx); subErr != nil {
1404-
err.Append(fmt.Errorf("failed to stop config poller: %w", subErr))
1405-
}
1406-
}
1407-
14081402
if r.httpServer != nil {
14091403
if subErr := r.httpServer.Shutdown(ctx); subErr != nil {
14101404
if errors.Is(subErr, context.DeadlineExceeded) {

router/pkg/controlplane/configpoller/config_poller.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ type ConfigPoller interface {
2626
// If the Config is nil, no new config is available and the current config should be used.
2727
// and updates the latest router config version. This method is only used for the initial config
2828
GetRouterConfig(ctx context.Context) (*routerconfig.Response, error)
29-
// Stop stops the config poller. After calling stop, the config poller cannot be used again.
30-
Stop(ctx context.Context) error
3129
}
3230

3331
type configPoller struct {
@@ -65,11 +63,6 @@ func (c *configPoller) Version() string {
6563
return c.latestRouterConfigVersion
6664
}
6765

68-
// Stop stops the config poller
69-
func (c *configPoller) Stop(_ context.Context) error {
70-
return c.poller.Stop()
71-
}
72-
7366
func (c *configPoller) Subscribe(ctx context.Context, handler func(newConfig *nodev1.RouterConfig, _ string) error) {
7467
c.poller.Subscribe(ctx, func() {
7568
start := time.Now()

router/pkg/controlplane/poll.go

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,58 +11,52 @@ type Poller interface {
1111
// Subscribe subscribes to the poller with a handler function that will be invoked
1212
// Must only be called once. If the handler is busy during a tick, the next tick will be skipped.
1313
Subscribe(ctx context.Context, handler func())
14-
// Stop stops the poller. That means no more events will be emitted.
15-
Stop() error
1614
}
1715

1816
type Poll struct {
19-
ticker *time.Ticker
20-
17+
interval time.Duration
2118
maxJitter time.Duration
2219
}
2320

2421
// NewPoll creates a new poller that emits events at the given interval
2522
// and executes the given handler function in a separate goroutine.
2623
func NewPoll(interval time.Duration, maxJitter time.Duration) *Poll {
27-
p := &Poll{
28-
maxJitter: maxJitter,
24+
// interval must be positive
25+
if interval <= 0 {
26+
panic("non-positive interval")
2927
}
3028

31-
// maxJitter must be positive, otherwise the random duration function will panic
29+
// maxJitter must be non-negative, otherwise the random duration function will panic
3230
if maxJitter < 0 {
3331
panic("negative max jitter")
3432
}
3533

36-
p.ticker = time.NewTicker(interval)
37-
38-
return p
39-
}
40-
41-
// Stop stops the poller. That means no more events will be emitted.
42-
// After calling stop, the poller cannot be used again.
43-
func (c *Poll) Stop() error {
44-
c.ticker.Stop()
45-
return nil
34+
return &Poll{
35+
interval: interval,
36+
maxJitter: maxJitter,
37+
}
4638
}
4739

4840
func (c *Poll) Subscribe(ctx context.Context, handler func()) {
4941
go func() {
42+
// Calculate initial delay: interval + jitter
43+
jitter := timex.RandomDuration(c.maxJitter)
44+
timer := time.NewTimer(c.interval + jitter)
45+
defer timer.Stop()
46+
5047
for {
5148
select {
5249
case <-ctx.Done():
53-
c.ticker.Stop()
5450
return
55-
case <-c.ticker.C:
56-
// If the current handler is still in progress
57-
// the next tick will be skipped. This is how a timer
58-
// is implemented in the standard library.
59-
60-
// Add jitter to the interval
61-
// This is to prevent all clients from hitting the server at exactly the same time,
62-
// which could cause a burst load issue
63-
time.Sleep(timex.RandomDuration(c.maxJitter))
64-
51+
case <-timer.C:
52+
// Execute handler
6553
handler()
54+
55+
// Calculate next execution time: interval + new jitter
56+
// This ensures we always wait at least 'interval' time between executions
57+
jitter := timex.RandomDuration(c.maxJitter)
58+
nextDelay := c.interval + jitter
59+
timer.Reset(nextDelay)
6660
}
6761
}
6862
}()

router/pkg/controlplane/poll_test.go

Lines changed: 146 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package controlplane
22

33
import (
4+
"context"
5+
"sync/atomic"
46
"testing"
57
"time"
68

79
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
811
)
912

1013
func Test_Poller(t *testing.T) {
@@ -23,13 +26,150 @@ func Test_Poller(t *testing.T) {
2326
})
2427
})
2528

26-
// This is a guarunteed pass because Poll.Stop() always returns nil,
27-
// but it's good to have a test for it should there be an error in the future
28-
t.Run("stopping should work correctly", func(t *testing.T) {
29-
p := NewPoll(1*time.Second, 0*time.Second)
29+
t.Run("interval plus jitter timing should work correctly", func(t *testing.T) {
30+
interval := 100 * time.Millisecond
31+
maxJitter := 50 * time.Millisecond
32+
expectedMinInterval := interval
33+
expectedMaxInterval := interval + maxJitter
3034

31-
err := p.Stop()
35+
p := NewPoll(interval, maxJitter)
3236

33-
assert.NoError(t, err)
37+
// Record execution timestamps
38+
var timestamps []time.Time
39+
executionCount := 0
40+
targetExecutions := 4
41+
42+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
43+
defer cancel()
44+
45+
p.Subscribe(ctx, func() {
46+
timestamps = append(timestamps, time.Now())
47+
executionCount++
48+
49+
// Cancel after we have enough executions
50+
if executionCount >= targetExecutions {
51+
cancel()
52+
}
53+
})
54+
55+
// Wait for context to be cancelled or timeout
56+
<-ctx.Done()
57+
58+
// We should have at least 2 executions to measure intervals
59+
require.GreaterOrEqual(t, len(timestamps), 2, "should have at least 2 executions")
60+
61+
// Calculate intervals between executions
62+
for i := 1; i < len(timestamps); i++ {
63+
actualInterval := timestamps[i].Sub(timestamps[i-1])
64+
65+
// Each interval should be at least the minimum interval
66+
assert.GreaterOrEqual(t, actualInterval, expectedMinInterval,
67+
"execution %d: actual interval %v should be >= minimum interval %v",
68+
i, actualInterval, expectedMinInterval)
69+
70+
// Each interval should be at most interval + maxJitter
71+
assert.LessOrEqual(t, actualInterval, expectedMaxInterval,
72+
"execution %d: actual interval %v should be <= maximum interval %v",
73+
i, actualInterval, expectedMaxInterval)
74+
75+
t.Logf("execution %d: interval = %v (expected: %v to %v)",
76+
i, actualInterval, expectedMinInterval, expectedMaxInterval)
77+
}
78+
})
79+
80+
t.Run("should not allow concurrent handler invocations", func(t *testing.T) {
81+
interval := 50 * time.Millisecond
82+
maxJitter := 0 * time.Millisecond // No jitter for predictable timing
83+
84+
p := NewPoll(interval, maxJitter)
85+
86+
var concurrentInvocations int32
87+
var maxConcurrentInvocations int32
88+
var totalInvocations int32
89+
90+
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
91+
defer cancel()
92+
93+
p.Subscribe(ctx, func() {
94+
// Increment concurrent counter
95+
current := atomic.AddInt32(&concurrentInvocations, 1)
96+
97+
// Track the maximum concurrent invocations we've seen
98+
for {
99+
max := atomic.LoadInt32(&maxConcurrentInvocations)
100+
if current <= max || atomic.CompareAndSwapInt32(&maxConcurrentInvocations, max, current) {
101+
break
102+
}
103+
}
104+
105+
// Increment total invocations
106+
atomic.AddInt32(&totalInvocations, 1)
107+
108+
// Simulate work that takes longer than the interval
109+
// This should cause subsequent timer events to be skipped
110+
time.Sleep(150 * time.Millisecond)
111+
112+
// Decrement concurrent counter
113+
atomic.AddInt32(&concurrentInvocations, -1)
114+
})
115+
116+
// Wait for context timeout
117+
<-ctx.Done()
118+
119+
// Verify that we never had more than 1 concurrent invocation
120+
maxConcurrent := atomic.LoadInt32(&maxConcurrentInvocations)
121+
totalInvoked := atomic.LoadInt32(&totalInvocations)
122+
123+
assert.Equal(t, int32(1), maxConcurrent,
124+
"should never have more than 1 concurrent handler invocation")
125+
126+
// We should have fewer invocations than if they were all allowed to run
127+
// (500ms test duration / 50ms interval = 10 possible, but many should be skipped)
128+
assert.Greater(t, int32(10), totalInvoked,
129+
"some invocations should have been skipped due to handler still running")
130+
131+
// But we should have at least some invocations
132+
assert.Greater(t, totalInvoked, int32(0),
133+
"should have at least some handler invocations")
134+
135+
t.Logf("total invocations: %d, max concurrent: %d", totalInvoked, maxConcurrent)
136+
})
137+
138+
t.Run("should stop polling when context is cancelled", func(t *testing.T) {
139+
interval := 50 * time.Millisecond
140+
maxJitter := 0 * time.Millisecond // No jitter for predictable timing
141+
142+
p := NewPoll(interval, maxJitter)
143+
144+
var executionCount int32
145+
146+
ctx, cancel := context.WithCancel(context.Background())
147+
148+
// Start polling
149+
go p.Subscribe(ctx, func() {
150+
atomic.AddInt32(&executionCount, 1)
151+
})
152+
153+
// Let it run for a bit to ensure polling starts
154+
time.Sleep(150 * time.Millisecond)
155+
countBeforeCancel := atomic.LoadInt32(&executionCount)
156+
157+
// Cancel the context
158+
cancel()
159+
160+
// Wait for any pending executions to complete and verify no new ones occur
161+
time.Sleep(200 * time.Millisecond)
162+
countAfterCancel := atomic.LoadInt32(&executionCount)
163+
164+
// Verify that we had some executions before cancellation
165+
assert.Greater(t, countBeforeCancel, int32(0),
166+
"should have had executions before context cancellation")
167+
168+
// Verify that no new executions occurred after cancellation
169+
assert.Equal(t, countBeforeCancel, countAfterCancel,
170+
"should not have new executions after context cancellation")
171+
172+
t.Logf("executions before cancel: %d, executions after cancel: %d",
173+
countBeforeCancel, countAfterCancel)
34174
})
35175
}

0 commit comments

Comments
 (0)