Skip to content

Commit 25f2de4

Browse files
authored
Merge pull request #499 Retry policy
2 parents 63b09aa + d606c50 commit 25f2de4

File tree

14 files changed

+514
-90
lines changed

14 files changed

+514
-90
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added retry policy options for topics: WithReaderCheckRetryErrorFunction, WithReaderStartTimeout, WithWriterCheckRetryErrorFunction, WithWriterStartTimeout
2+
13
## v3.41.0
24
* Added option for set interval of auth token update in topic streams
35
* Supported internal allocator in `{session,statement}.Execute` for decrease memory usage

internal/topic/retriable_error.go

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,84 @@
11
package topic
22

33
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff"
48
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
59
)
610

7-
func IsRetryableError(err error) bool {
11+
const (
12+
DefaultStartTimeout = time.Minute
13+
)
14+
15+
type RetrySettings struct {
16+
StartTimeout time.Duration // Full retry timeout
17+
CheckError PublicCheckErrorRetryFunction
18+
}
19+
20+
type PublicCheckErrorRetryFunction func(errInfo PublicCheckErrorRetryArgs) PublicCheckRetryResult
21+
22+
type PublicCheckErrorRetryArgs struct {
23+
Error error
24+
}
25+
26+
func NewCheckRetryArgs(err error) PublicCheckErrorRetryArgs {
27+
return PublicCheckErrorRetryArgs{
28+
Error: err,
29+
}
30+
}
31+
32+
type PublicCheckRetryResult struct {
33+
val int
34+
}
35+
36+
var (
37+
PublicRetryDecisionDefault = PublicCheckRetryResult{val: 0}
38+
PublicRetryDecisionRetry = PublicCheckRetryResult{val: 1}
39+
PublicRetryDecisionStop = PublicCheckRetryResult{val: 2}
40+
)
41+
42+
func CheckResetReconnectionCounters(lastTry, now time.Time, connectionTimeout time.Duration) bool {
43+
const resetAttemptEmpiricalCoefficient = 10
44+
return now.Sub(lastTry) > connectionTimeout*resetAttemptEmpiricalCoefficient
45+
}
46+
47+
func CheckRetryMode(err error, settings RetrySettings, retriesDuration time.Duration) (
48+
_ backoff.Backoff,
49+
isRetriable bool,
50+
) {
51+
if retriesDuration > settings.StartTimeout {
52+
return nil, false
53+
}
54+
55+
isRetriable = true
56+
857
mode := retry.Check(err)
9-
return mode.MustRetry(true)
58+
59+
decision := PublicRetryDecisionDefault
60+
if settings.CheckError != nil {
61+
decision = settings.CheckError(NewCheckRetryArgs(err))
62+
}
63+
64+
switch decision {
65+
case PublicRetryDecisionDefault:
66+
isRetriable = mode.MustRetry(true)
67+
case PublicRetryDecisionRetry:
68+
isRetriable = true
69+
case PublicRetryDecisionStop:
70+
isRetriable = false
71+
default:
72+
panic(fmt.Errorf("unexpected retry decision: %v", decision))
73+
}
74+
75+
if !isRetriable {
76+
return nil, false
77+
}
78+
79+
if mode.BackoffType() == backoff.TypeFast {
80+
return backoff.Fast, true
81+
}
82+
83+
return backoff.Slow, true
1084
}
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
package topic
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
9+
grpcCodes "google.golang.org/grpc/codes"
10+
11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
13+
)
14+
15+
func TestCheckRetryMode(t *testing.T) {
16+
fastError := xerrors.Transport(xerrors.WithCode(grpcCodes.Unavailable))
17+
slowError := xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_OVERLOADED))
18+
unretriable := xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_UNAUTHORIZED))
19+
20+
table := []struct {
21+
name string
22+
err error
23+
settings RetrySettings
24+
duration time.Duration
25+
resBackoff backoff.Backoff
26+
resRetriable bool
27+
}{
28+
{
29+
name: "OK",
30+
err: nil,
31+
settings: RetrySettings{},
32+
duration: 0,
33+
resBackoff: nil,
34+
resRetriable: false,
35+
},
36+
{
37+
name: "RetryRetriableErrorFast",
38+
err: fastError,
39+
settings: RetrySettings{},
40+
duration: 0,
41+
resBackoff: backoff.Fast,
42+
resRetriable: true,
43+
},
44+
{
45+
name: "RetryRetriableErrorFastWithTimeout",
46+
err: fastError,
47+
settings: RetrySettings{
48+
StartTimeout: time.Second,
49+
},
50+
duration: time.Second * 2,
51+
resBackoff: nil,
52+
resRetriable: false,
53+
},
54+
{
55+
name: "RetryRetriableErrorSlow",
56+
err: slowError,
57+
settings: RetrySettings{},
58+
duration: 0,
59+
resBackoff: backoff.Slow,
60+
resRetriable: true,
61+
},
62+
{
63+
name: "RetryRetriableErrorSlowWithTimeout",
64+
err: slowError,
65+
settings: RetrySettings{
66+
StartTimeout: time.Second,
67+
},
68+
duration: time.Second * 2,
69+
resBackoff: nil,
70+
resRetriable: false,
71+
},
72+
{
73+
name: "UnretriableError",
74+
err: unretriable,
75+
settings: RetrySettings{},
76+
duration: 0,
77+
resBackoff: nil,
78+
resRetriable: false,
79+
},
80+
{
81+
name: "UserOverrideFastErrorDefault",
82+
err: fastError,
83+
settings: RetrySettings{
84+
CheckError: func(errInfo PublicCheckErrorRetryArgs) PublicCheckRetryResult {
85+
return PublicRetryDecisionDefault
86+
},
87+
},
88+
duration: 0,
89+
resBackoff: backoff.Fast,
90+
resRetriable: true,
91+
},
92+
{
93+
name: "UserOverrideFastErrorRetry",
94+
err: fastError,
95+
settings: RetrySettings{
96+
CheckError: func(errInfo PublicCheckErrorRetryArgs) PublicCheckRetryResult {
97+
return PublicRetryDecisionRetry
98+
},
99+
},
100+
duration: 0,
101+
resBackoff: backoff.Fast,
102+
resRetriable: true,
103+
},
104+
{
105+
name: "UserOverrideFastErrorStop",
106+
err: fastError,
107+
settings: RetrySettings{
108+
CheckError: func(errInfo PublicCheckErrorRetryArgs) PublicCheckRetryResult {
109+
return PublicRetryDecisionStop
110+
},
111+
},
112+
duration: 0,
113+
resBackoff: nil,
114+
resRetriable: false,
115+
},
116+
{
117+
name: "UserOverrideSlowErrorDefault",
118+
err: slowError,
119+
settings: RetrySettings{
120+
CheckError: func(errInfo PublicCheckErrorRetryArgs) PublicCheckRetryResult {
121+
return PublicRetryDecisionDefault
122+
},
123+
},
124+
duration: 0,
125+
resBackoff: backoff.Slow,
126+
resRetriable: true,
127+
},
128+
{
129+
name: "UserOverrideSlowErrorRetry",
130+
err: slowError,
131+
settings: RetrySettings{
132+
CheckError: func(errInfo PublicCheckErrorRetryArgs) PublicCheckRetryResult {
133+
return PublicRetryDecisionRetry
134+
},
135+
},
136+
duration: 0,
137+
resBackoff: backoff.Slow,
138+
resRetriable: true,
139+
},
140+
{
141+
name: "UserOverrideSlowErrorStop",
142+
err: slowError,
143+
settings: RetrySettings{
144+
CheckError: func(errInfo PublicCheckErrorRetryArgs) PublicCheckRetryResult {
145+
return PublicRetryDecisionStop
146+
},
147+
},
148+
duration: 0,
149+
resBackoff: nil,
150+
resRetriable: false,
151+
},
152+
{
153+
name: "UserOverrideUnretriableErrorDefault",
154+
err: xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_UNAUTHORIZED)),
155+
settings: RetrySettings{
156+
CheckError: func(errInfo PublicCheckErrorRetryArgs) PublicCheckRetryResult {
157+
return PublicRetryDecisionDefault
158+
},
159+
},
160+
duration: 0,
161+
resBackoff: nil,
162+
resRetriable: false,
163+
},
164+
{
165+
name: "UserOverrideUnretriableErrorRetry",
166+
err: xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_UNAUTHORIZED)),
167+
settings: RetrySettings{
168+
CheckError: func(errInfo PublicCheckErrorRetryArgs) PublicCheckRetryResult {
169+
return PublicRetryDecisionRetry
170+
},
171+
},
172+
duration: 0,
173+
resBackoff: backoff.Slow,
174+
resRetriable: true,
175+
},
176+
{
177+
name: "UserOverrideUnretriableErrorStop",
178+
err: xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_UNAUTHORIZED)),
179+
settings: RetrySettings{
180+
CheckError: func(errInfo PublicCheckErrorRetryArgs) PublicCheckRetryResult {
181+
return PublicRetryDecisionStop
182+
},
183+
},
184+
duration: 0,
185+
resBackoff: nil,
186+
resRetriable: false,
187+
},
188+
{
189+
name: "UserOverrideFastErrorRetryWithTimeout",
190+
err: xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_UNAUTHORIZED)),
191+
settings: RetrySettings{
192+
StartTimeout: time.Second,
193+
CheckError: func(errInfo PublicCheckErrorRetryArgs) PublicCheckRetryResult {
194+
return PublicRetryDecisionRetry
195+
},
196+
},
197+
duration: time.Second * 2,
198+
resBackoff: nil,
199+
resRetriable: false,
200+
},
201+
}
202+
203+
for _, test := range table {
204+
t.Run(test.name, func(t *testing.T) {
205+
resBackoff, retriable := CheckRetryMode(test.err, test.settings, test.duration)
206+
require.Equal(t, test.resBackoff, resBackoff)
207+
require.Equal(t, test.resRetriable, retriable)
208+
})
209+
}
210+
}

internal/topic/topicclientinternal/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ func (c *Client) StartReader(
201201
topicoptions.WithCommonConfig(c.cfg.Common),
202202
topicreaderinternal.WithCredentials(c.cred),
203203
topicreaderinternal.WithTrace(c.cfg.Trace),
204+
topicoptions.WithReaderStartTimeout(topic.DefaultStartTimeout),
204205
}
205206
opts = append(defaultOpts, opts...)
206207

internal/topic/topicreaderinternal/reader.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ import (
77
"time"
88

99
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
10-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff"
1110
"github.com/ydb-platform/ydb-go-sdk/v3/internal/clone"
1211
"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
1312
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicreader"
13+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic"
1414
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1515
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
1616
)
@@ -88,7 +88,13 @@ func NewReader(
8888
}
8989

9090
res := Reader{
91-
reader: newReaderReconnector(readerConnector, cfg.OperationTimeout(), cfg.Tracer, cfg.BaseContext),
91+
reader: newReaderReconnector(
92+
readerConnector,
93+
cfg.OperationTimeout(),
94+
cfg.RetrySettings,
95+
cfg.Tracer,
96+
cfg.BaseContext,
97+
),
9298
defaultBatchConfig: cfg.DefaultBatchConfig,
9399
tracer: cfg.Tracer,
94100
}
@@ -178,10 +184,9 @@ func (r *Reader) CommitRanges(ctx context.Context, ranges []PublicCommitRange) e
178184
type ReaderConfig struct {
179185
config.Common
180186

187+
RetrySettings topic.RetrySettings
181188
DefaultBatchConfig ReadMessageBatchOptions
182189
topicStreamReaderConfig
183-
184-
reconnectionBackoff backoff.Backoff
185190
}
186191

187192
// PublicReaderOption
@@ -213,7 +218,6 @@ func convertNewParamsToStreamConfig(
213218
) (cfg ReaderConfig) {
214219
cfg.topicStreamReaderConfig = newTopicStreamReaderConfig()
215220
cfg.Consumer = consumer
216-
cfg.reconnectionBackoff = backoff.Fast
217221

218222
// make own copy, for prevent changing internal states if readSelectors will change outside
219223
cfg.ReadSelectors = make([]PublicReadSelector, len(readSelectors))

0 commit comments

Comments
 (0)