Skip to content

Commit bea8ea9

Browse files
committed
use retry settings in reader
1 parent 378864f commit bea8ea9

File tree

2 files changed

+62
-12
lines changed

2 files changed

+62
-12
lines changed

internal/topic/retry_settings.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,17 @@ type RetrySettings struct {
1010
type PublicCheckRetryFunc func(errInfo PublicCheckRetryArgs) PublicCheckRetryResult
1111

1212
type PublicCheckRetryArgs struct {
13-
Attempt int
14-
Error error
13+
IsPreCheck bool
14+
Attempt int
15+
Error error
16+
}
17+
18+
func NewCheckRetryArgs(preCheck bool, attempts int, err error) PublicCheckRetryArgs {
19+
return PublicCheckRetryArgs{
20+
IsPreCheck: preCheck,
21+
Attempt: attempts,
22+
Error: err,
23+
}
1524
}
1625

1726
type PublicCheckRetryResult int

internal/topic/topicreaderinternal/stream_reconnector.go

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
1919
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
2020
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
21+
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
2122
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2223
)
2324

@@ -32,8 +33,9 @@ type readerReconnector struct {
3233
clock clockwork.Clock
3334
background background.Worker
3435

35-
tracer trace.Topic
36-
baseContext context.Context
36+
tracer trace.Topic
37+
baseContext context.Context
38+
retrySettings topic.RetrySettings
3739

3840
readerConnect readerConnectFunc
3941

@@ -94,7 +96,7 @@ func (r *readerReconnector) ReadMessageBatch(ctx context.Context, opts ReadMessa
9496
attempt++
9597
stream, err := r.stream(ctx)
9698
switch {
97-
case r.isRetriableErr(err):
99+
case r.isRetriableError(err):
98100
r.fireReconnectOnRetryableError(stream, err)
99101
runtime.Gosched()
100102
continue
@@ -105,7 +107,7 @@ func (r *readerReconnector) ReadMessageBatch(ctx context.Context, opts ReadMessa
105107
}
106108

107109
res, err := stream.ReadMessageBatch(ctx, opts)
108-
if r.isRetriableErr(err) {
110+
if r.isRetriableError(err) {
109111
r.fireReconnectOnRetryableError(stream, err)
110112
runtime.Gosched()
111113
continue
@@ -186,8 +188,8 @@ func (r *readerReconnector) reconnectionLoop(ctx context.Context) {
186188
// pass
187189
}
188190

189-
if request.reason != nil && r.isRetriableErr(request.reason) {
190-
delay := backoff.Fast.Delay(attempt)
191+
if backoff, isRetriableErr := r.checkErrRetryMode(attempt, request.reason); request.reason != nil && isRetriableErr {
192+
delay := backoff.Delay(attempt)
191193

192194
select {
193195
case <-ctx.Done():
@@ -237,7 +239,7 @@ func (r *readerReconnector) reconnect(ctx context.Context, oldReader batchedStre
237239

238240
newStream, err := r.connectWithTimeout()
239241

240-
if r.isRetriableErr(err) {
242+
if r.isRetriableError(err) {
241243
go func(reason error) {
242244
// guarantee write reconnect signal to channel
243245
r.reconnectFromBadStream <- newReconnectRequest(oldReader, reason)
@@ -254,8 +256,47 @@ func (r *readerReconnector) reconnect(ctx context.Context, oldReader batchedStre
254256
return err
255257
}
256258

257-
func (r *readerReconnector) isRetriableErr(err error) bool {
258-
return topic.IsRetryableError(err)
259+
func (r *readerReconnector) isRetriableError(err error) bool {
260+
_, res := r.checkErrRetryModeLogic(true, 0, err)
261+
return res
262+
}
263+
264+
func (r *readerReconnector) checkErrRetryMode(attempts int, err error) (
265+
backoffType backoff.Backoff,
266+
isRetriableErr bool,
267+
) {
268+
return r.checkErrRetryModeLogic(false, attempts, err)
269+
}
270+
271+
func (r *readerReconnector) checkErrRetryModeLogic(preCheck bool, attempts int, err error) (
272+
backoffType backoff.Backoff,
273+
isRetriableErr bool,
274+
) {
275+
isRetriableErr = true
276+
if r.retrySettings.CheckError != nil {
277+
switch decision := r.retrySettings.CheckError(topic.NewCheckRetryArgs(preCheck, attempts, err)); decision {
278+
case topic.PublicRetryDecisionDefault:
279+
isRetriableErr = topic.IsRetryableError(err)
280+
case topic.PublicRetryDecisionRetry:
281+
isRetriableErr = true
282+
case topic.PublicRetryDecisionStop:
283+
isRetriableErr = false
284+
default:
285+
panic(fmt.Errorf("unexpected retry decision: %v", decision))
286+
}
287+
}
288+
if !isRetriableErr {
289+
return nil, false
290+
}
291+
if preCheck {
292+
return nil, true
293+
}
294+
295+
if retry.Check(err).BackoffType() == backoff.TypeFast {
296+
return backoff.Fast, true
297+
}
298+
299+
return backoff.Slow, true
259300
}
260301

261302
func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err error) {
@@ -297,7 +338,7 @@ func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err err
297338
}
298339

299340
func (r *readerReconnector) fireReconnectOnRetryableError(stream batchedStreamReader, err error) {
300-
if !r.isRetriableErr(err) {
341+
if !r.isRetriableError(err) {
301342
return
302343
}
303344

0 commit comments

Comments
 (0)