Skip to content

Commit 637deba

Browse files
committed
Use fake clock for pauses in TestWriterImpl_Reconnect/ReconnectOnErrors test
1 parent ee79d1e commit 637deba

File tree

3 files changed

+28
-7
lines changed

3 files changed

+28
-7
lines changed

internal/topic/topicwriterinternal/writer_options.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package topicwriterinternal
33
import (
44
"time"
55

6+
"github.com/jonboulle/clockwork"
7+
68
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
79
"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
810
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
@@ -152,3 +154,10 @@ func WithTopic(topic string) PublicWriterOption {
152154
cfg.topic = topic
153155
}
154156
}
157+
158+
// WithClock is private option for tests
159+
func WithClock(clock clockwork.Clock) PublicWriterOption {
160+
return func(cfg *WriterReconnectorConfig) {
161+
cfg.clock = clock
162+
}
163+
}

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ type WriterReconnector struct {
118118
queue messageQueue
119119
background background.Worker
120120
retrySettings topic.RetrySettings
121-
clock clockwork.Clock
122121
writerInstanceID string
123122
sessionID string
124123
semaphore *semaphore.Weighted
@@ -149,7 +148,6 @@ func newWriterReconnectorStopped(
149148
cfg: cfg,
150149
semaphore: semaphore.NewWeighted(int64(cfg.MaxQueueLen)),
151150
queue: newMessageQueue(),
152-
clock: clockwork.NewRealClock(),
153151
lastSeqNo: -1,
154152
firstInitResponseProcessedChan: make(empty.Chan),
155153
encodersMap: NewEncoderMap(),
@@ -189,7 +187,7 @@ func (w *WriterReconnector) fillFields(messages []messageWithDataContent) error
189187
if w.cfg.AutoSetCreatedTime {
190188
if msg.CreatedAt.IsZero() {
191189
if now.IsZero() {
192-
now = w.clock.Now()
190+
now = w.cfg.clock.Now()
193191
}
194192
msg.CreatedAt = now
195193
} else {
@@ -391,17 +389,17 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) {
391389
now := time.Now()
392390
if startOfRetries.IsZero() || topic.CheckResetReconnectionCounters(prevAttemptTime, now, w.cfg.connectTimeout) {
393391
attempt = 0
394-
startOfRetries = w.clock.Now()
392+
startOfRetries = w.cfg.clock.Now()
395393
} else {
396394
attempt++
397395
}
398396
prevAttemptTime = now
399397

400398
if reconnectReason != nil {
401-
retryDuration := w.clock.Since(startOfRetries)
399+
retryDuration := w.cfg.clock.Since(startOfRetries)
402400
if backoff, retry := topic.CheckRetryMode(reconnectReason, w.retrySettings, retryDuration); retry {
403401
delay := backoff.Delay(attempt)
404-
delayTimer := w.clock.NewTimer(delay)
402+
delayTimer := w.cfg.clock.NewTimer(delay)
405403
select {
406404
case <-doneCtx:
407405
delayTimer.Stop()

internal/topic/topicwriterinternal/writer_reconnector_test.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ import (
77
"errors"
88
"fmt"
99
"io"
10+
"math"
1011
"sort"
1112
"sync"
1213
"sync/atomic"
1314
"testing"
1415
"time"
1516

17+
"github.com/jonboulle/clockwork"
1618
"github.com/stretchr/testify/require"
1719
"go.uber.org/mock/gomock"
1820

@@ -426,7 +428,19 @@ func TestWriterImpl_Reconnect(t *testing.T) {
426428
xtest.TestManyTimesWithName(t, "ReconnectOnErrors", func(t testing.TB) {
427429
ctx := xtest.Context(t)
428430

429-
w := newTestWriterStopped()
431+
clock := clockwork.NewFakeClock()
432+
433+
go func() {
434+
for {
435+
if ctx.Err() != nil {
436+
return
437+
}
438+
clock.Advance(time.Second)
439+
time.Sleep(time.Microsecond)
440+
}
441+
}()
442+
443+
w := newTestWriterStopped(WithClock(clock), WithTokenUpdateInterval(time.Duration(math.MaxInt64)))
430444

431445
mc := gomock.NewController(t)
432446

0 commit comments

Comments
 (0)