Skip to content

Commit 90a231a

Browse files
authored
Merge pull request #803 added WaitInit to Writer and Reader
2 parents 2a743f6 + bb52406 commit 90a231a

17 files changed

+350
-4
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added `WaitInit` and `WaitInitInfo` method to the topic reader and writer
2+
13
## v3.52.2
24
* Removed support of placeholder "_" for ignoring columns in `database/sql` result sets
35

internal/topic/topicreaderinternal/batched_stream_reader_interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
//go:generate mockgen -source batched_stream_reader_interface.go -destination batched_stream_reader_mock_test.go -package topicreaderinternal -write_package_comment=false
88

99
type batchedStreamReader interface {
10+
WaitInit(ctx context.Context) error
1011
ReadMessageBatch(ctx context.Context, opts ReadMessageBatchOptions) (*PublicBatch, error)
1112
Commit(ctx context.Context, commitRange commitRange) error
1213
CloseWithError(ctx context.Context, err error) error

internal/topic/topicreaderinternal/batched_stream_reader_mock_test.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/topic/topicreaderinternal/reader.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ func NewReader(
119119
return res
120120
}
121121

122+
func (r *Reader) WaitInit(ctx context.Context) error {
123+
return r.reader.WaitInit(ctx)
124+
}
125+
122126
func (r *Reader) ID() int64 {
123127
return r.readerID
124128
}

internal/topic/topicreaderinternal/reader_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"testing"
88

99
"github.com/golang/mock/gomock"
10+
"github.com/stretchr/testify/assert"
1011
"github.com/stretchr/testify/require"
1112

1213
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
@@ -153,3 +154,19 @@ func TestReader_Commit(t *testing.T) {
153154
require.ErrorIs(t, err, errCommitSessionFromOtherReader)
154155
})
155156
}
157+
158+
func TestReader_WaitInit(t *testing.T) {
159+
mc := gomock.NewController(t)
160+
defer mc.Finish()
161+
162+
readerID := nextReaderID()
163+
baseReader := NewMockbatchedStreamReader(mc)
164+
reader := &Reader{
165+
reader: baseReader,
166+
readerID: readerID,
167+
}
168+
169+
baseReader.EXPECT().WaitInit(gomock.Any())
170+
err := reader.WaitInit(context.Background())
171+
assert.NoError(t, err)
172+
}

internal/topic/topicreaderinternal/stream_reader_impl.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,13 @@ func newTopicStreamReaderStopped(
163163
return res
164164
}
165165

166+
func (r *topicStreamReaderImpl) WaitInit(_ context.Context) error {
167+
if !r.started {
168+
return errors.New("not started: can be started only after initialize from constructor")
169+
}
170+
return nil
171+
}
172+
166173
func (r *topicStreamReaderImpl) ReadMessageBatch(
167174
ctx context.Context,
168175
opts ReadMessageBatchOptions,

internal/topic/topicreaderinternal/stream_reader_impl_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,21 @@ func TestTopicStreamReaderImpl_Create(t *testing.T) {
340340
})
341341
}
342342

343+
func TestTopicStreamReaderImpl_WaitInit(t *testing.T) {
344+
t.Run("OK", func(t *testing.T) {
345+
e := newTopicReaderTestEnv(t)
346+
e.Start()
347+
err := e.reader.WaitInit(context.Background())
348+
require.NoError(t, err)
349+
})
350+
351+
t.Run("not started", func(t *testing.T) {
352+
e := newTopicReaderTestEnv(t)
353+
err := e.reader.WaitInit(context.Background())
354+
require.Error(t, err)
355+
})
356+
}
357+
343358
func TestStreamReaderImpl_OnPartitionCloseHandle(t *testing.T) {
344359
xtest.TestManyTimesWithName(t, "GracefulFalseCancelPartitionContext", func(t testing.TB) {
345360
e := newTopicReaderTestEnv(t)

internal/topic/topicreaderinternal/stream_reconnector.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ type readerReconnector struct {
4949
streamVal batchedStreamReader
5050
streamErr error
5151
closedErr error
52+
53+
initErr error
54+
initDone bool
55+
initDoneCh empty.Chan
5256
}
5357

5458
//nolint:revive
@@ -70,6 +74,7 @@ func newReaderReconnector(
7074
baseContext: baseContext,
7175
retrySettings: retrySettings,
7276
}
77+
7378
if res.connectTimeout == 0 {
7479
res.connectTimeout = value.InfiniteDuration
7580
}
@@ -146,6 +151,13 @@ func (r *readerReconnector) CloseWithError(ctx context.Context, err error) error
146151
closeErr = streamCloseErr
147152
}
148153
}
154+
155+
r.m.WithLock(func() {
156+
if !r.initDone {
157+
r.initErr = closeErr
158+
close(r.initDoneCh)
159+
}
160+
})
149161
})
150162
return closeErr
151163
}
@@ -163,6 +175,7 @@ func (r *readerReconnector) initChannelsAndClock() {
163175
}
164176
r.reconnectFromBadStream = make(chan reconnectRequest, 1)
165177
r.streamConnectionInProgress = make(empty.Chan)
178+
r.initDoneCh = make(empty.Chan)
166179
close(r.streamConnectionInProgress) // no progress at start
167180
}
168181

@@ -261,6 +274,10 @@ func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldRead
261274
r.streamErr = err
262275
if err == nil {
263276
r.streamVal = newStream
277+
if !r.initDone {
278+
r.initDone = true
279+
close(r.initDoneCh)
280+
}
264281
}
265282
})
266283
return err
@@ -312,10 +329,22 @@ func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err err
312329
if res.err == nil {
313330
return res.stream, nil
314331
}
315-
316332
return nil, res.err
317333
}
318334

335+
func (r *readerReconnector) WaitInit(ctx context.Context) error {
336+
if ctx.Err() != nil {
337+
return ctx.Err()
338+
}
339+
340+
select {
341+
case <-ctx.Done():
342+
return ctx.Err()
343+
case <-r.initDoneCh:
344+
return r.initErr
345+
}
346+
}
347+
319348
func (r *readerReconnector) fireReconnectOnRetryableError(stream batchedStreamReader, err error) {
320349
if !r.isRetriableError(err) {
321350
return

internal/topic/topicreaderinternal/stream_reconnector_test.go

Lines changed: 90 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,11 @@ func TestTopicReaderReconnectorReadMessageBatch(t *testing.T) {
130130
})
131131

132132
xtest.TestManyTimesWithName(t, "OnClose", func(t testing.TB) {
133-
reconnector := &readerReconnector{tracer: &trace.Topic{}}
133+
reconnector := &readerReconnector{
134+
tracer: &trace.Topic{},
135+
streamErr: errUnconnected,
136+
}
137+
reconnector.initChannelsAndClock()
134138
testErr := errors.New("test'")
135139

136140
go func() {
@@ -256,7 +260,10 @@ func TestTopicReaderReconnectorConnectionLoop(t *testing.T) {
256260
t.Run("StartWithCancelledContext", func(t *testing.T) {
257261
ctx, cancel := xcontext.WithCancel(context.Background())
258262
cancel()
259-
reconnector := &readerReconnector{tracer: &trace.Topic{}}
263+
reconnector := &readerReconnector{
264+
tracer: &trace.Topic{},
265+
}
266+
reconnector.initChannelsAndClock()
260267
reconnector.reconnectionLoop(ctx) // must return
261268
})
262269
}
@@ -295,6 +302,84 @@ func TestTopicReaderReconnectorStart(t *testing.T) {
295302
_ = reconnector.CloseWithError(ctx, nil)
296303
}
297304

305+
func TestTopicReaderReconnectorWaitInit(t *testing.T) {
306+
t.Run("OK", func(t *testing.T) {
307+
mc := gomock.NewController(t)
308+
defer mc.Finish()
309+
310+
reconnector := &readerReconnector{
311+
tracer: &trace.Topic{},
312+
}
313+
reconnector.initChannelsAndClock()
314+
315+
stream := NewMockbatchedStreamReader(mc)
316+
317+
reconnector.readerConnect = readerConnectFuncMock(readerConnectFuncAnswer{
318+
callback: func(ctx context.Context) (batchedStreamReader, error) {
319+
return stream, nil
320+
},
321+
})
322+
323+
reconnector.start()
324+
325+
err := reconnector.WaitInit(context.Background())
326+
require.NoError(t, err)
327+
328+
// one more run is needed to check idempotency
329+
err = reconnector.WaitInit(context.Background())
330+
require.NoError(t, err)
331+
})
332+
333+
t.Run("contextDeadlineInProgress", func(t *testing.T) {
334+
mc := gomock.NewController(t)
335+
defer mc.Finish()
336+
337+
reconnector := &readerReconnector{
338+
tracer: &trace.Topic{},
339+
}
340+
reconnector.initChannelsAndClock()
341+
342+
stream := NewMockbatchedStreamReader(mc)
343+
344+
ctx, cancel := context.WithCancel(context.Background())
345+
reconnector.readerConnect = readerConnectFuncMock(readerConnectFuncAnswer{
346+
callback: func(ctx context.Context) (batchedStreamReader, error) {
347+
cancel()
348+
return stream, nil
349+
},
350+
})
351+
reconnector.start()
352+
353+
err := reconnector.WaitInit(ctx)
354+
require.ErrorIs(t, err, ctx.Err())
355+
})
356+
357+
t.Run("contextDeadlineBeforeStart", func(t *testing.T) {
358+
mc := gomock.NewController(t)
359+
defer mc.Finish()
360+
361+
reconnector := &readerReconnector{
362+
tracer: &trace.Topic{},
363+
}
364+
reconnector.initDoneCh = make(empty.Chan, 1)
365+
reconnector.initChannelsAndClock()
366+
367+
stream := NewMockbatchedStreamReader(mc)
368+
369+
reconnector.readerConnect = readerConnectFuncMock(readerConnectFuncAnswer{
370+
callback: func(ctx context.Context) (batchedStreamReader, error) {
371+
return stream, nil
372+
},
373+
})
374+
375+
ctx, cancel := context.WithCancel(context.Background())
376+
cancel()
377+
err := reconnector.WaitInit(ctx)
378+
379+
require.ErrorIs(t, err, ctx.Err())
380+
})
381+
}
382+
298383
func TestTopicReaderReconnectorFireReconnectOnRetryableError(t *testing.T) {
299384
t.Run("Ok", func(t *testing.T) {
300385
mc := gomock.NewController(t)
@@ -332,7 +417,9 @@ func TestTopicReaderReconnectorFireReconnectOnRetryableError(t *testing.T) {
332417
mc := gomock.NewController(t)
333418
defer mc.Finish()
334419

335-
reconnector := &readerReconnector{tracer: &trace.Topic{}}
420+
reconnector := &readerReconnector{
421+
tracer: &trace.Topic{},
422+
}
336423
stream := NewMockbatchedStreamReader(mc)
337424
reconnector.initChannelsAndClock()
338425

internal/topic/topicwriterinternal/writer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ func (w *Writer) Write(ctx context.Context, messages ...Message) error {
4848
return w.streamWriter.Write(ctx, messages)
4949
}
5050

51+
func (w *Writer) WaitInit(ctx context.Context) (info InitialInfo, err error) {
52+
return w.streamWriter.WaitInit(ctx)
53+
}
54+
5155
func (w *Writer) Close(ctx context.Context) error {
5256
return w.streamWriter.Close(ctx)
5357
}

0 commit comments

Comments
 (0)