Skip to content

Commit c15f831

Browse files
authored
pkg/csplugin: use backoff package to retry notifications (#3944)
1 parent afe69fa commit c15f831

File tree

3 files changed

+265
-16
lines changed

3 files changed

+265
-16
lines changed

pkg/csplugin/backoff.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package csplugin
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/sirupsen/logrus"
8+
"github.com/cenkalti/backoff/v5"
9+
)
10+
11+
type backoffFactory func() backoff.BackOff
12+
13+
const (
14+
// time to wait before the first retry attempt.
15+
defaultInitialInterval = 1 * time.Second
16+
// how much the interval increases after each failure.
17+
defaultMultiplier = 2.0
18+
// add jitter to avoid synchronized retries across instances.
19+
defaultRandomizationFactor = 0.3
20+
// maximum delay between two consecutive retries.
21+
defaultMaxInterval = 30 * time.Second
22+
// total time limit for retries when MaxRetry is not specified.
23+
defaultMaxElapsedTime = 5 * time.Minute
24+
)
25+
26+
var defaultBackoffFactory = func() backoff.BackOff {
27+
bo := backoff.NewExponentialBackOff()
28+
bo.InitialInterval = defaultInitialInterval
29+
bo.Multiplier = defaultMultiplier
30+
bo.RandomizationFactor = defaultRandomizationFactor
31+
bo.MaxInterval = defaultMaxInterval
32+
return bo
33+
}
34+
35+
// retryWithBackoff retries the given function according to cfg.
36+
func retryWithBackoff(
37+
ctx context.Context,
38+
cfg PluginConfig,
39+
logger logrus.FieldLogger,
40+
fn func(ctx context.Context) error,
41+
newBackoff backoffFactory,
42+
) error {
43+
// the application is closing / reloading, stop notifying
44+
if ctx.Err() != nil {
45+
return ctx.Err()
46+
}
47+
48+
var attempt uint64
49+
onRetry := func(err error, next time.Duration) {
50+
attempt++
51+
logger.WithFields(logrus.Fields{
52+
"attempt": attempt,
53+
"next": next.String(),
54+
}).Warnf("notify attempt failed: %v", err)
55+
}
56+
57+
operation := func() (struct{}, error) {
58+
attemptCtx, cancel := context.WithTimeout(ctx, cfg.TimeOut)
59+
defer cancel()
60+
return struct{}{}, fn(attemptCtx)
61+
}
62+
63+
bo := newBackoff()
64+
65+
options := []backoff.RetryOption{
66+
backoff.WithBackOff(bo),
67+
backoff.WithNotify(onRetry),
68+
}
69+
70+
if cfg.MaxRetry > 0 {
71+
options = append(options, backoff.WithMaxTries(cfg.MaxRetry+1))
72+
} else {
73+
options = append(options, backoff.WithMaxElapsedTime(defaultMaxElapsedTime))
74+
}
75+
76+
_, err := backoff.Retry(ctx, operation, options...)
77+
return err
78+
}

pkg/csplugin/backoff_test.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package csplugin
2+
3+
import (
4+
"context"
5+
"errors"
6+
"io"
7+
"testing"
8+
"time"
9+
10+
"github.com/cenkalti/backoff/v5"
11+
"github.com/sirupsen/logrus"
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
var (
17+
errFail = errors.New("fail")
18+
errAlwaysFails = errors.New("always fails")
19+
)
20+
21+
type fakeBackoff struct {
22+
retries []time.Duration
23+
idx int
24+
}
25+
26+
func (f *fakeBackoff) NextBackOff() time.Duration {
27+
if f.idx >= len(f.retries) {
28+
return backoff.Stop
29+
}
30+
d := f.retries[f.idx]
31+
f.idx++
32+
return d
33+
}
34+
35+
func (f *fakeBackoff) Reset() {
36+
f.idx = 0
37+
}
38+
39+
func newFakeBackoff(durations ...time.Duration) backoffFactory {
40+
return func() backoff.BackOff {
41+
return &fakeBackoff{retries: durations}
42+
}
43+
}
44+
45+
func TestRetryWithBackoff_SuccessAfterRetries(t *testing.T) {
46+
discard := logrus.New()
47+
discard.Out = io.Discard
48+
49+
ctx := t.Context()
50+
51+
var calls int
52+
53+
fn := func(_ context.Context) error {
54+
calls++
55+
if calls < 3 {
56+
return errFail
57+
}
58+
59+
return nil
60+
}
61+
62+
cfg := PluginConfig{TimeOut: 50 * time.Millisecond, MaxRetry: 5}
63+
err := retryWithBackoff(ctx, cfg, discard, fn, newFakeBackoff(
64+
0, 0, 0,
65+
))
66+
require.NoError(t, err)
67+
assert.Equal(t, 3, calls)
68+
}
69+
70+
func TestRetryWithBackoff_ExhaustsRetries(t *testing.T) {
71+
discard := logrus.New()
72+
discard.Out = io.Discard
73+
74+
ctx := t.Context()
75+
76+
fn := func(_ context.Context) error {
77+
return errAlwaysFails
78+
}
79+
80+
cfg := PluginConfig{TimeOut: 50 * time.Millisecond, MaxRetry: 2}
81+
err := retryWithBackoff(ctx, cfg, discard, fn, newFakeBackoff(
82+
0, 0, 0,
83+
))
84+
require.ErrorIs(t, err, errAlwaysFails)
85+
}
86+
87+
func TestRetryWithBackoff_ContextCanceled(t *testing.T) {
88+
discard := logrus.New()
89+
discard.Out = io.Discard
90+
91+
ctx, cancel := context.WithCancel(t.Context())
92+
cancel()
93+
94+
var calls int
95+
fn := func(ctx context.Context) error {
96+
calls++
97+
select {
98+
case <-ctx.Done():
99+
return ctx.Err()
100+
default:
101+
return nil
102+
}
103+
}
104+
105+
cfg := PluginConfig{TimeOut: 50 * time.Millisecond, MaxRetry: 3}
106+
err := retryWithBackoff(ctx, cfg, discard, fn, newFakeBackoff(0,0,0))
107+
require.ErrorIs(t, err, context.Canceled)
108+
assert.Zero(t, calls)
109+
110+
// XXX: do we attempt to notify if ctx is canceled?
111+
// assert.Equal(t, 1, calls, "fn should be called once and fail with canceled")
112+
}
113+
114+
func TestRetryWithBackoff_PerAttemptTimeout(t *testing.T) {
115+
discard := logrus.New()
116+
discard.Out = io.Discard
117+
118+
fn := func(ctx context.Context) error {
119+
<-ctx.Done()
120+
return ctx.Err()
121+
}
122+
123+
cfg := PluginConfig{TimeOut: 10 * time.Millisecond, MaxRetry: 1}
124+
err := retryWithBackoff(t.Context(), cfg, discard, fn, newFakeBackoff(0))
125+
require.ErrorIs(t, err, context.DeadlineExceeded)
126+
}
127+
128+
func TestRetryWithBackoff_MaxElapsedTime(t *testing.T) {
129+
discard := logrus.New()
130+
discard.Out = io.Discard
131+
132+
ctx := t.Context()
133+
134+
fn := func(_ context.Context) error {
135+
return errAlwaysFails
136+
}
137+
138+
// MaxRetry = 0 -> unlimited retries
139+
// but we override elapsed time to a very short value
140+
cfg := PluginConfig{TimeOut: 10 * time.Millisecond, MaxRetry: 0}
141+
142+
err := retryWithBackoff(ctx, cfg, discard, fn, newFakeBackoff(0, 0, 0))
143+
require.Error(t, err)
144+
assert.ErrorIs(t, err, errAlwaysFails)
145+
}
146+
147+
func TestRetryWithBackoff_SuccessFirstTry(t *testing.T) {
148+
discard := logrus.New()
149+
discard.Out = io.Discard
150+
151+
ctx := t.Context()
152+
153+
fn := func(_ context.Context) error {
154+
return nil
155+
}
156+
157+
cfg := PluginConfig{TimeOut: 50 * time.Millisecond, MaxRetry: 3}
158+
err := retryWithBackoff(ctx, cfg, discard, fn, newFakeBackoff(0, 0, 0))
159+
require.NoError(t, err)
160+
}
161+

pkg/csplugin/broker.go

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type PluginBroker struct {
5050
pluginKillMethods []func()
5151
pluginProcConfig *csconfig.PluginCfg
5252
pluginsTypesToDispatch map[string]struct{}
53+
newBackoff backoffFactory
5354
}
5455

5556
// holder to determine where to dispatch config and how to format messages
@@ -58,7 +59,7 @@ type PluginConfig struct {
5859
Name string `yaml:"name"`
5960
GroupWait time.Duration `yaml:"group_wait,omitempty"`
6061
GroupThreshold int `yaml:"group_threshold,omitempty"`
61-
MaxRetry int `yaml:"max_retry,omitempty"`
62+
MaxRetry uint `yaml:"max_retry,omitempty"`
6263
TimeOut time.Duration `yaml:"timeout,omitempty"`
6364

6465
Format string `yaml:"format,omitempty"` // specific to notification plugins
@@ -79,10 +80,6 @@ func (pc *PluginConfig) UnmarshalYAML(unmarshal func(any) error) error {
7980
return errors.New("missing required field 'type'")
8081
}
8182

82-
if aux.MaxRetry == 0 {
83-
aux.MaxRetry = 1
84-
}
85-
8683
if aux.TimeOut == 0 {
8784
aux.TimeOut = time.Second * 5
8885
}
@@ -117,6 +114,13 @@ func (pb *PluginBroker) Init(ctx context.Context, pluginCfg *csconfig.PluginCfg,
117114
return nil
118115
}
119116

117+
func (pb *PluginBroker) ensureBackoff() backoffFactory {
118+
if pb.newBackoff == nil {
119+
pb.newBackoff = defaultBackoffFactory
120+
}
121+
return pb.newBackoff
122+
}
123+
120124
func (pb *PluginBroker) Kill() {
121125
for _, kill := range pb.pluginKillMethods {
122126
kill()
@@ -403,27 +407,33 @@ func (pb *PluginBroker) tryNotify(ctx context.Context, pluginName, message strin
403407
}
404408

405409
func (pb *PluginBroker) pushNotificationsToPlugin(ctx context.Context, pluginName string, alerts []*models.Alert) error {
406-
log.WithField("plugin", pluginName).Debugf("pushing %d alerts to plugin", len(alerts))
410+
logger := log.WithField("plugin", pluginName)
411+
412+
logger.Debugf("pushing %d alerts to plugin", len(alerts))
407413

408414
if len(alerts) == 0 {
409415
return nil
410416
}
411417

412-
message, err := FormatAlerts(pb.pluginConfigByName[pluginName].Format, alerts)
418+
pluginCfg := pb.pluginConfigByName[pluginName]
419+
420+
message, err := FormatAlerts(pluginCfg.Format, alerts)
413421
if err != nil {
414-
return err
422+
return fmt.Errorf("format alerts for notification: %w", err)
415423
}
416424

417-
backoffDuration := time.Second
425+
// make sure we have a default or custom backoff
426+
pb.ensureBackoff()
418427

419-
for i := 1; i <= pb.pluginConfigByName[pluginName].MaxRetry; i++ {
420-
if err = pb.tryNotify(ctx, pluginName, message); err == nil {
421-
return nil
428+
err = retryWithBackoff(ctx, pluginCfg, logger, func(ctx context.Context) error {
429+
return pb.tryNotify(ctx, pluginName, message)
430+
}, pb.newBackoff)
431+
if err != nil {
432+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
433+
logger.Warn("delivery canceled during shutdown")
434+
} else {
435+
logger.Errorf("delivery failed after retries: %v", err)
422436
}
423-
424-
log.WithField("plugin", pluginName).Errorf("%s error, retry num %d", err, i)
425-
time.Sleep(backoffDuration)
426-
backoffDuration *= 2
427437
}
428438

429439
return err

0 commit comments

Comments
 (0)