Skip to content

Commit 7a64090

Browse files
committed
reader and writer use some retry decision functions
1 parent 180bb96 commit 7a64090

File tree

5 files changed

+90
-79
lines changed

5 files changed

+90
-79
lines changed

internal/topic/retriable_error.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package topic
22

33
import (
4+
"fmt"
45
"time"
56

7+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff"
68
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
79
)
810

@@ -15,3 +17,36 @@ func CheckResetReconnectionCounters(lastTry, now time.Time, connectionTimeout ti
1517
const resetAttemptEmpiricalCoefficient = 10
1618
return now.Sub(lastTry) > connectionTimeout*resetAttemptEmpiricalCoefficient
1719
}
20+
21+
func CheckRetryMode(err error, isPreCheck bool, currentAttempt int, customCheckFunc PublicCheckRetryFunc) (
22+
_ backoff.Backoff,
23+
isRetriable bool,
24+
) {
25+
isRetriable = true
26+
27+
decision := PublicRetryDecisionDefault
28+
if customCheckFunc != nil {
29+
decision = customCheckFunc(NewCheckRetryArgs(isPreCheck, currentAttempt, err))
30+
}
31+
32+
switch decision {
33+
case PublicRetryDecisionDefault:
34+
isRetriable = IsRetryableError(err)
35+
case PublicRetryDecisionRetry:
36+
isRetriable = true
37+
case PublicRetryDecisionStop:
38+
isRetriable = false
39+
default:
40+
panic(fmt.Errorf("unexpected retry decision: %v", decision))
41+
}
42+
43+
if !isRetriable {
44+
return nil, false
45+
}
46+
47+
if retry.Check(err).BackoffType() == backoff.TypeFast {
48+
return backoff.Fast, true
49+
}
50+
51+
return backoff.Slow, true
52+
}

internal/topic/topicreaderinternal/stream_reconnector.go

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
1919
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
2020
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
21-
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
2221
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2322
)
2423

@@ -254,46 +253,15 @@ func (r *readerReconnector) reconnect(ctx context.Context, oldReader batchedStre
254253
}
255254

256255
func (r *readerReconnector) isRetriableError(err error) bool {
257-
_, res := r.checkErrRetryModeLogic(true, 0, err)
256+
_, res := topic.CheckRetryMode(err, true, 0, r.retrySettings.CheckError)
258257
return res
259258
}
260259

261260
func (r *readerReconnector) checkErrRetryMode(attempts int, err error) (
262261
backoffType backoff.Backoff,
263262
isRetriableErr bool,
264263
) {
265-
return r.checkErrRetryModeLogic(false, attempts, err)
266-
}
267-
268-
func (r *readerReconnector) checkErrRetryModeLogic(preCheck bool, attempts int, err error) (
269-
backoffType backoff.Backoff,
270-
isRetriableErr bool,
271-
) {
272-
isRetriableErr = true
273-
if r.retrySettings.CheckError != nil {
274-
switch decision := r.retrySettings.CheckError(topic.NewCheckRetryArgs(preCheck, attempts, err)); decision {
275-
case topic.PublicRetryDecisionDefault:
276-
isRetriableErr = topic.IsRetryableError(err)
277-
case topic.PublicRetryDecisionRetry:
278-
isRetriableErr = true
279-
case topic.PublicRetryDecisionStop:
280-
isRetriableErr = false
281-
default:
282-
panic(fmt.Errorf("unexpected retry decision: %v", decision))
283-
}
284-
}
285-
if !isRetriableErr {
286-
return nil, false
287-
}
288-
if preCheck {
289-
return nil, true
290-
}
291-
292-
if retry.Check(err).BackoffType() == backoff.TypeFast {
293-
return backoff.Fast, true
294-
}
295-
296-
return backoff.Slow, true
264+
return topic.CheckRetryMode(err, false, attempts, r.retrySettings.CheckError)
297265
}
298266

299267
func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err error) {

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515

1616
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
1717
"github.com/ydb-platform/ydb-go-sdk/v3/internal/background"
18-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff"
1918
"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
2019
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
2120
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
@@ -104,7 +103,8 @@ func newWriterReconnectorConfig(options ...PublicWriterOption) WriterReconnector
104103
}
105104

106105
type WriterReconnector struct {
107-
cfg WriterReconnectorConfig
106+
cfg WriterReconnectorConfig
107+
retrySettings topic.RetrySettings
108108

109109
semaphore *semaphore.Weighted
110110
queue messageQueue
@@ -337,8 +337,8 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) {
337337
streamCtxCancel(xerrors.WithStackTrace(errCloseWriterReconnectorConnectionLoop))
338338
}()
339339

340-
var prevConnectionError error
341-
var prevConnectionTime time.Time
340+
var reconnectReason error
341+
var prevAttemptTime time.Time
342342

343343
for {
344344
if ctx.Err() != nil {
@@ -348,54 +348,61 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) {
348348
streamCtxCancel(xerrors.WithStackTrace(errCloseWriterReconnectorReconnect))
349349
streamCtx, streamCtxCancel = createStreamContext()
350350

351-
attempt++
352-
353-
// delay if reconnect
354-
if attempt > 1 {
355-
delay := backoff.Fast.Delay(attempt - 2)
356-
select {
357-
case <-doneCtx:
358-
return
359-
case <-w.clock.After(delay):
360-
// pass
361-
}
362-
}
363-
364-
traceOnDone := trace.TopicOnWriterReconnect(
365-
w.cfg.tracer,
366-
w.writerInstanceID,
367-
w.cfg.topic,
368-
w.cfg.producerID,
369-
attempt,
370-
)
371-
372-
stream, trackedErr := w.connectWithTimeout(streamCtx)
373-
traceOnDone(trackedErr)
374-
375-
var writer *SingleStreamWriter
376-
if trackedErr == nil {
351+
now := time.Now()
352+
if topic.CheckResetReconnectionCounters(prevAttemptTime, now, w.cfg.connectTimeout) {
377353
attempt = 0
378-
w.queue.ResetSentProgress()
379-
writer, trackedErr = NewSingleStreamWriter(ctx, w.createWriterStreamConfig(stream))
354+
} else {
355+
attempt++
380356
}
381-
if trackedErr == nil {
382-
w.onWriterChange(writer)
383-
trackedErr = writer.CloseWait(ctx)
357+
prevAttemptTime = now
358+
359+
if reconnectReason != nil {
360+
if backoff, retry := topic.CheckRetryMode(reconnectReason, false, attempt, w.retrySettings.CheckError); retry {
361+
delay := backoff.Delay(attempt)
362+
select {
363+
case <-doneCtx:
364+
return
365+
case <-w.clock.After(delay):
366+
// pass
367+
}
368+
} else {
369+
_ = w.close(ctx, reconnectReason)
370+
return
371+
}
384372
}
385-
w.onWriterChange(nil)
386373

387-
if !w.isRetriableErr(trackedErr) {
388-
closeCtx, cancel := context.WithCancel(ctx)
389-
cancel()
390-
_ = w.close(closeCtx, trackedErr)
391-
return
374+
writer, err := w.startWriteStream(ctx, streamCtx, attempt)
375+
w.onWriterChange(writer)
376+
if err == nil {
377+
reconnectReason = writer.WaitClose(ctx)
378+
} else {
379+
reconnectReason = err
392380
}
393-
// next iteration
394381
}
395382
}
396383

397-
func (w *WriterReconnector) isRetriableErr(err error) bool {
398-
return topic.IsRetryableError(err)
384+
func (w *WriterReconnector) startWriteStream(ctx context.Context, streamCtx context.Context, attempt int) (
385+
writer *SingleStreamWriter,
386+
err error,
387+
) {
388+
traceOnDone := trace.TopicOnWriterReconnect(
389+
w.cfg.tracer,
390+
w.writerInstanceID,
391+
w.cfg.topic,
392+
w.cfg.producerID,
393+
attempt,
394+
)
395+
defer func() {
396+
traceOnDone(err)
397+
}()
398+
399+
stream, err := w.connectWithTimeout(streamCtx)
400+
if err != nil {
401+
return nil, err
402+
}
403+
404+
w.queue.ResetSentProgress()
405+
return NewSingleStreamWriter(ctx, w.createWriterStreamConfig(stream))
399406
}
400407

401408
func (w *WriterReconnector) needReceiveLastSeqNo() bool {

internal/topic/topicwriterinternal/writer_reconnector_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,7 @@ func newTestEnv(t testing.TB, options *testEnvOptions) *testEnv {
774774
require.NoError(t, res.writer.waitFirstInitResponse(res.ctx))
775775

776776
t.Cleanup(func() {
777+
res.writer.close(context.Background(), errors.New("stop writer test environment"))
777778
close(res.stopReadEvents)
778779
<-streamClosed
779780
})

internal/topic/topicwriterinternal/writer_single_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (w *SingleStreamWriter) close(ctx context.Context, reason error) error {
103103
return resErr
104104
}
105105

106-
func (w *SingleStreamWriter) CloseWait(ctx context.Context) error {
106+
func (w *SingleStreamWriter) WaitClose(ctx context.Context) error {
107107
select {
108108
case <-ctx.Done():
109109
return ctx.Err()

0 commit comments

Comments
 (0)