Skip to content

Commit 180bb96

Browse files
committed
sync
1 parent bea8ea9 commit 180bb96

File tree

3 files changed

+12
-5
lines changed

3 files changed

+12
-5
lines changed

internal/topic/retriable_error.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
package topic
22

33
import (
4+
"time"
5+
46
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
57
)
68

79
func IsRetryableError(err error) bool {
810
mode := retry.Check(err)
911
return mode.MustRetry(true)
1012
}
13+
14+
func CheckResetReconnectionCounters(lastTry, now time.Time, connectionTimeout time.Duration) bool {
15+
const resetAttemptEmpiricalCoefficient = 10
16+
return now.Sub(lastTry) > connectionTimeout*resetAttemptEmpiricalCoefficient
17+
}

internal/topic/topicreaderinternal/stream_reconnector.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -169,15 +169,12 @@ func (r *readerReconnector) reconnectionLoop(ctx context.Context) {
169169
attempt := 0
170170
for {
171171
now := r.clock.Now()
172-
sinceLastTime := now.Sub(lastTime)
173-
lastTime = now
174-
175-
const resetAttemptEmpiricalCoefficient = 10
176-
if sinceLastTime > r.connectTimeout*resetAttemptEmpiricalCoefficient {
172+
if topic.CheckResetReconnectionCounters(lastTime, now, r.connectTimeout) {
177173
attempt = 0
178174
} else {
179175
attempt++
180176
}
177+
lastTime = now
181178

182179
var request reconnectRequest
183180
select {

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,9 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) {
337337
streamCtxCancel(xerrors.WithStackTrace(errCloseWriterReconnectorConnectionLoop))
338338
}()
339339

340+
var prevConnectionError error
341+
var prevConnectionTime time.Time
342+
340343
for {
341344
if ctx.Err() != nil {
342345
return

0 commit comments

Comments
 (0)