Skip to content

Commit d606c50

Browse files
committed
fix default settings to writer in tests
1 parent f46f2f3 commit d606c50

File tree

6 files changed

+23
-13
lines changed

6 files changed

+23
-13
lines changed

internal/topic/retriable_error.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
)
1010

1111
const (
12-
DefaultConnectionTimeout = time.Minute
12+
DefaultStartTimeout = time.Minute
1313
)
1414

1515
type RetrySettings struct {

internal/topic/topicclientinternal/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func (c *Client) StartReader(
201201
topicoptions.WithCommonConfig(c.cfg.Common),
202202
topicreaderinternal.WithCredentials(c.cred),
203203
topicreaderinternal.WithTrace(c.cfg.Trace),
204-
topicoptions.WithReaderStartTimeout(topic.DefaultConnectionTimeout),
204+
topicoptions.WithReaderStartTimeout(topic.DefaultStartTimeout),
205205
}
206206
opts = append(defaultOpts, opts...)
207207

internal/topic/topicwriterinternal/writer_options.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ func WithSessionMeta(meta map[string]string) PublicWriterOption {
125125
}
126126
}
127127

128+
func WithStartTimeout(timeout time.Duration) PublicWriterOption {
129+
return func(cfg *WriterReconnectorConfig) {
130+
cfg.RetrySettings.StartTimeout = timeout
131+
}
132+
}
133+
128134
func WithWaitAckOnWrite(val bool) PublicWriterOption {
129135
return func(cfg *WriterReconnectorConfig) {
130136
cfg.WaitServerAck = val

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func newWriterReconnectorConfig(options ...PublicWriterOption) WriterReconnector
8585
MaxMessageSize: 50 * 1024 * 1024,
8686
MaxQueueLen: 1000,
8787
RetrySettings: topic.RetrySettings{
88-
StartTimeout: topic.DefaultConnectionTimeout,
88+
StartTimeout: topic.DefaultStartTimeout,
8989
},
9090
}
9191
if cfg.compressorCount == 0 {
@@ -141,6 +141,7 @@ func newWriterReconnectorStopped(cfg WriterReconnectorConfig) *WriterReconnector
141141
firstInitResponseProcessedChan: make(empty.Chan),
142142
encodersMap: NewEncoderMap(),
143143
writerInstanceID: writerInstanceID.String(),
144+
retrySettings: cfg.RetrySettings,
144145
}
145146

146147
res.queue.OnAckReceived = res.onAckReceived
@@ -356,7 +357,7 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) {
356357
now := time.Now()
357358
if topic.CheckResetReconnectionCounters(prevAttemptTime, now, w.cfg.connectTimeout) {
358359
attempt = 0
359-
startOfRetries = time.Now()
360+
startOfRetries = w.clock.Now()
360361
} else {
361362
attempt++
362363
}

internal/topic/topicwriterinternal/writer_reconnector_test.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ func TestWriterImpl_Reconnect(t *testing.T) {
360360
require.ErrorIs(t, w.background.CloseReason(), testErr)
361361
})
362362

363-
t.Run("ReconnectOnErrors", func(t *testing.T) {
363+
xtest.TestManyTimesWithName(t, "ReconnectOnErrors", func(t testing.TB) {
364364
ctx := xtest.Context(t)
365365

366366
w := newTestWriterStopped()
@@ -400,10 +400,6 @@ func TestWriterImpl_Reconnect(t *testing.T) {
400400
}
401401

402402
strm2InitSent := make(empty.Chan)
403-
go func() {
404-
err := w.Write(ctx, newTestMessages(1))
405-
require.NoError(t, err)
406-
}()
407403

408404
strm2 := newStream("strm2", func() {
409405
close(strm2InitSent)
@@ -446,7 +442,16 @@ func TestWriterImpl_Reconnect(t *testing.T) {
446442
return res.stream, res.connectionError
447443
}
448444

449-
w.connectionLoop(ctx)
445+
connectionLoopStopped := make(empty.Chan)
446+
go func() {
447+
defer close(connectionLoopStopped)
448+
w.connectionLoop(ctx)
449+
}()
450+
451+
err := w.Write(ctx, newTestMessages(1))
452+
require.NoError(t, err)
453+
454+
xtest.WaitChannelClosed(t, connectionLoopStopped)
450455
})
451456
}
452457

topic/topicoptions/topicoptions_writer.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,7 @@ func WithWriterSetAutoCreatedAt(val bool) WriterOption {
183183
//
184184
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
185185
func WithWriterStartTimeout(timeout time.Duration) WriterOption {
186-
return func(cfg *topicwriterinternal.WriterReconnectorConfig) {
187-
cfg.RetrySettings.StartTimeout = timeout
188-
}
186+
return topicwriterinternal.WithStartTimeout(timeout)
189187
}
190188

191189
// WithWriterTrace

0 commit comments

Comments
 (0)