Skip to content

Commit 2618d0c

Browse files
authored
Retry service-busy errors after a delay (#1174)
Builds on #1167, but adds delay before retrying service-busy errors. For now, since our server-side RPS quotas are calculated per second, this delays at least 1 second per service busy error. This is in contrast to the previous behavior, which would have retried up to about a dozen times in the same period, which is the cause of service-busy-based retry storms that cause lots more service-busy errors. --- This also gives us an easy way to make use of "retry after" information in errors we return to the caller, though currently our errors do not contain that. Eventually this should probably come from the server, which has a global view of how many requests this service has sent, and can provide a more precise delay to individual callers. E.g. currently our server-side ratelimiter works in 1-second slices... but that isn't something that's guaranteed to stay true. The server could also detect truly large floods of requests, and return jittered values larger than 1 second to more powerfully stop the storm, or to allow prioritizing some requests (like activity responses) over others simply by returning a lower delay.
1 parent 2bccc5c commit 2618d0c

File tree

7 files changed

+140
-52
lines changed

7 files changed

+140
-52
lines changed

internal/common/backoff/retry.go

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@ package backoff
2222

2323
import (
2424
"context"
25+
"errors"
2526
"sync"
2627
"time"
28+
29+
s "go.uber.org/cadence/.gen/go/shared"
2730
)
2831

2932
type (
@@ -87,7 +90,7 @@ func NewConcurrentRetrier(retryPolicy RetryPolicy) *ConcurrentRetrier {
8790
}
8891

8992
// Retry function can be used to wrap any call with retry logic using the passed in policy
90-
func Retry(ctx context.Context, operation Operation, policy RetryPolicy, isRetryable IsRetryable) error {
93+
func Retry(ctx context.Context, operation Operation, policy RetryPolicy, isRetriable IsRetryable) error {
9194
var err error
9295
var next time.Duration
9396

@@ -103,16 +106,40 @@ Retry_Loop:
103106
return err
104107
}
105108

106-
// Check if the error is retryable
107-
if isRetryable != nil && !isRetryable(err) {
109+
if !isRetriable(err) {
108110
return err
109111
}
110112

113+
retryAfter := ErrRetryableAfter(err)
114+
// update the time to wait until the next attempt.
115+
// as this is a *minimum*, just add it to the current delay time.
116+
//
117+
// this could be changed to clamp to retryAfter as a minimum.
118+
// this is intentionally *not* done here, so repeated service-busy errors are guaranteed
119+
// to generate *increasing* amount of time between requests, and not just send N in a row
120+
// with 1 second of delay. duplicates imply "still overloaded", so this will hopefully
121+
// help reduce the odds of snowballing.
122+
// this is a pretty minor thing though, and it should not cause problems if we change it
123+
// to make behavior more predictable.
124+
next += retryAfter
125+
111126
// check if ctx is done
127+
if ctx.Err() != nil {
128+
return err
129+
}
130+
131+
// wait for the next retry period (or context timeout)
112132
if ctxDone := ctx.Done(); ctxDone != nil {
133+
// we could check if this is longer than context deadline and immediately fail...
134+
// ...but wasting time prevents higher-level retries from trying too early.
135+
// this is particularly useful for service-busy, but seems valid for essentially all retried errors.
136+
//
137+
// this could probably be changed if we get requests for it, but for now it better-protects
138+
// the server by preventing "external" retry storms.
113139
timer := time.NewTimer(next)
114140
select {
115141
case <-ctxDone:
142+
timer.Stop()
116143
return err
117144
case <-timer.C:
118145
continue Retry_Loop
@@ -123,3 +150,21 @@ Retry_Loop:
123150
time.Sleep(next)
124151
}
125152
}
153+
154+
// ErrRetryableAfter returns a minimum delay until the next attempt.
155+
//
156+
// for most errors this will be 0, and normal backoff logic will determine
157+
// the full retry period, but e.g. service busy errors (or any case where the
158+
// server knows a "time until it is not useful to retry") are safe to assume
159+
// that a literally immediate retry is *not* going to be useful.
160+
//
161+
// note that this is only a minimum, however. longer delays are assumed to
162+
// be equally valid.
163+
func ErrRetryableAfter(err error) (retryAfter time.Duration) {
164+
if target := (*s.ServiceBusyError)(nil); errors.As(err, &target) {
165+
// eventually: return a time-until-retry from the server.
166+
// for now though, just ensure at least one second before the next attempt.
167+
return time.Second
168+
}
169+
return 0
170+
}

internal/common/backoff/retry_test.go

Lines changed: 49 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -27,28 +27,51 @@ import (
2727
"time"
2828

2929
"github.com/stretchr/testify/assert"
30+
"go.uber.org/cadence/.gen/go/shared"
31+
)
32+
33+
type errCategory int
34+
35+
const (
36+
noErr errCategory = iota
37+
anyErr
38+
serviceBusyErr
3039
)
3140

3241
func TestRetry(t *testing.T) {
3342
t.Parallel()
3443

44+
always := func(err error) bool {
45+
return true
46+
}
47+
never := func(err error) bool {
48+
return false
49+
}
50+
3551
succeedOnAttemptNum := 5
3652
tests := []struct {
3753
name string
3854
maxAttempts int
55+
maxTime time.Duration // context timeout
3956
isRetryable func(error) bool
4057

41-
shouldError bool
58+
err errCategory
4259
expectedCalls int
4360
}{
44-
{"success", 2 * succeedOnAttemptNum, nil, false, succeedOnAttemptNum},
45-
{"too many tries", 3, nil, true, 4}, // max 3 retries == 4 calls. must be < succeedOnAttemptNum to work.
46-
{"success with always custom retry", 2 * succeedOnAttemptNum, func(err error) bool {
47-
return true // retry on all errors, same as no custom retry
48-
}, false, succeedOnAttemptNum},
49-
{"success with never custom retry", 2 * succeedOnAttemptNum, func(err error) bool {
50-
return false // never retry
51-
}, true, 1},
61+
{"success", 2 * succeedOnAttemptNum, time.Second, always, noErr, succeedOnAttemptNum},
62+
{"too many tries", 3, time.Second, always, anyErr, 4}, // max 3 retries == 4 calls. must be < succeedOnAttemptNum to work.
63+
{"success with always custom retry", 2 * succeedOnAttemptNum, time.Second, always, noErr, succeedOnAttemptNum},
64+
{"success with never custom retry", 2 * succeedOnAttemptNum, time.Second, never, anyErr, 1},
65+
66+
// elapsed-time-sensitive tests below.
67+
// consider raising time granularity if flaky, or we could set up a more complete mock
68+
// to resolve flakiness for real, but that's a fair bit more complex.
69+
70+
// try -> sleep(10ms) -> try -> sleep(20ms) -> try -> sleep(40ms) -> timeout == 3 calls.
71+
{"timed out eventually", 5, 50 * time.Millisecond, always, anyErr, 3},
72+
73+
// try -> sleep(longer than context timeout due to busy err) -> timeout == 1 call.
74+
{"timed out due to long minimum delay", 5, 10 * time.Millisecond, always, serviceBusyErr, 1},
5275
}
5376

5477
for _, test := range tests {
@@ -63,49 +86,34 @@ func TestRetry(t *testing.T) {
6386
return nil
6487
}
6588

66-
return &someError{}
89+
switch test.err {
90+
case noErr:
91+
return &someError{} // non-erroring tests should not reach this branch
92+
case anyErr:
93+
return &someError{}
94+
case serviceBusyErr:
95+
return &shared.ServiceBusyError{}
96+
}
97+
panic("unreachable")
6798
}
6899

69-
policy := NewExponentialRetryPolicy(1 * time.Millisecond)
70-
policy.SetMaximumInterval(5 * time.Millisecond)
100+
policy := NewExponentialRetryPolicy(10 * time.Millisecond)
101+
policy.SetMaximumInterval(50 * time.Millisecond)
71102
policy.SetMaximumAttempts(test.maxAttempts)
72103

73-
err := Retry(context.Background(), op, policy, test.isRetryable)
74-
if test.shouldError {
75-
assert.Error(t, err)
76-
} else {
104+
ctx, cancel := context.WithTimeout(context.Background(), test.maxTime)
105+
defer cancel()
106+
err := Retry(ctx, op, policy, test.isRetryable)
107+
if test.err == noErr {
77108
assert.NoError(t, err, "Retry count: %v", i)
109+
} else {
110+
assert.Error(t, err)
78111
}
79112
assert.Equal(t, test.expectedCalls, i, "wrong number of calls")
80113
})
81114
}
82115
}
83116

84-
func TestNoRetryAfterContextDone(t *testing.T) {
85-
t.Parallel()
86-
retryCounter := 0
87-
op := func() error {
88-
retryCounter++
89-
90-
if retryCounter == 5 {
91-
return nil
92-
}
93-
94-
return &someError{}
95-
}
96-
97-
policy := NewExponentialRetryPolicy(10 * time.Millisecond)
98-
policy.SetMaximumInterval(50 * time.Millisecond)
99-
policy.SetMaximumAttempts(10)
100-
101-
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
102-
defer cancel()
103-
104-
err := Retry(ctx, op, policy, nil)
105-
assert.Error(t, err)
106-
assert.True(t, retryCounter >= 2, "retryCounter should be at least 2 but was %d", retryCounter) // verify that we did retry
107-
}
108-
109117
func TestConcurrentRetrier(t *testing.T) {
110118
t.Parallel()
111119
a := assert.New(t)

internal/internal_retry.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ func isServiceTransientError(err error) bool {
113113
return false
114114
}
115115

116+
if target := (*s.ServiceBusyError)(nil); errors.As(err, &target) {
117+
return true
118+
}
119+
116120
// s.InternalServiceError
117121
// s.ServiceBusyError (must retry after a delay, but it is transient)
118122
// server-side-only error types (as they should not reach clients)

internal/internal_retry_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ func TestErrRetries(t *testing.T) {
4747
&s.RemoteSyncMatchedError{},
4848
&s.InternalDataInconsistencyError{},
4949
} {
50-
assert.True(t, isServiceTransientError(err), "%T should be transient", err)
50+
retryable := isServiceTransientError(err)
51+
assert.True(t, retryable, "%T should be transient", err)
5152
}
5253
})
5354
t.Run("terminal", func(t *testing.T) {
@@ -67,7 +68,8 @@ func TestErrRetries(t *testing.T) {
6768

6869
errShutdown, // shutdowns can't be stopped
6970
} {
70-
assert.False(t, isServiceTransientError(err), "%T should be fatal", err)
71+
retryable := isServiceTransientError(err)
72+
assert.False(t, retryable, "%T should be fatal", err)
7173
}
7274
})
7375
}

internal/internal_task_pollers.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -765,12 +765,27 @@ func (wtp *workflowTaskPoller) poll(ctx context.Context) (interface{}, error) {
765765

766766
response, err := wtp.service.PollForDecisionTask(ctx, request, getYarpcCallOptions(wtp.featureFlags)...)
767767
if err != nil {
768-
if isServiceTransientError(err) {
768+
retryable := isServiceTransientError(err)
769+
if retryable {
769770
wtp.metricsScope.Counter(metrics.DecisionPollTransientFailedCounter).Inc(1)
770771
} else {
771772
wtp.metricsScope.Counter(metrics.DecisionPollFailedCounter).Inc(1)
772773
}
773774
wtp.updateBacklog(request.TaskList.GetKind(), 0)
775+
776+
// pause for the retry delay if present.
777+
// failures also have an exponential backoff, implemented at a higher level,
778+
// but this ensures a minimum is respected.
779+
retryAfter := backoff.ErrRetryableAfter(err)
780+
if retryAfter > 0 {
781+
t := time.NewTimer(retryAfter)
782+
select {
783+
case <-ctx.Done():
784+
t.Stop()
785+
case <-t.C:
786+
}
787+
}
788+
774789
return nil, err
775790
}
776791

@@ -990,11 +1005,26 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (*s.PollForActivityTask
9901005
response, err := atp.service.PollForActivityTask(ctx, request, getYarpcCallOptions(atp.featureFlags)...)
9911006

9921007
if err != nil {
993-
if isServiceTransientError(err) {
1008+
retryable := isServiceTransientError(err)
1009+
if retryable {
9941010
atp.metricsScope.Counter(metrics.ActivityPollTransientFailedCounter).Inc(1)
9951011
} else {
9961012
atp.metricsScope.Counter(metrics.ActivityPollFailedCounter).Inc(1)
9971013
}
1014+
1015+
// pause for the retry delay if present.
1016+
// failures also have an exponential backoff, implemented at a higher level,
1017+
// but this ensures a minimum is respected.
1018+
retryAfter := backoff.ErrRetryableAfter(err)
1019+
if retryAfter > 0 {
1020+
t := time.NewTimer(retryAfter)
1021+
select {
1022+
case <-ctx.Done():
1023+
t.Stop()
1024+
case <-t.C:
1025+
}
1026+
}
1027+
9981028
return nil, startTime, err
9991029
}
10001030
if response == nil || len(response.TaskToken) == 0 {

internal/internal_worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func ensureRequiredParams(params *workerExecutionParameters) {
161161
config := zap.NewProductionConfig()
162162
// set default time formatter to "2006-01-02T15:04:05.000Z0700"
163163
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
164-
//config.Level.SetLevel(zapcore.DebugLevel)
164+
// config.Level.SetLevel(zapcore.DebugLevel)
165165
logger, _ := config.Build()
166166
params.Logger = logger
167167
params.Logger.Info("No logger configured for cadence worker. Created default one.")

internal/workflow_replayer.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,6 @@ import (
2626
"encoding/json"
2727
"errors"
2828
"fmt"
29-
"io/ioutil"
30-
"math"
31-
3229
"github.com/golang/mock/gomock"
3330
"github.com/opentracing/opentracing-go"
3431
"github.com/pborman/uuid"
@@ -40,6 +37,8 @@ import (
4037
"go.uber.org/cadence/internal/common/backoff"
4138
"go.uber.org/cadence/internal/common/serializer"
4239
"go.uber.org/zap"
40+
"io/ioutil"
41+
"math"
4342
)
4443

4544
const (

0 commit comments

Comments
 (0)