Skip to content

Commit 89e6c3c

Browse files
authored
Merge pull request #359 from ydb-platform/kikimr-15560-fix-nil-stream
fix panic on nil stream in connect failed
2 parents 68a9dbb + 9a1d92e commit 89e6c3c

File tree

4 files changed

+31
-5
lines changed

4 files changed

+31
-5
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Fixed nil pointer exception in topic reader if reconnect failed
2+
13
## v3.37.1
24
* Refactored the `xsql.badconn.Error`
35

internal/topic/topicreaderinternal/stream_reconnector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ func (r *readerReconnector) reconnect(ctx context.Context, oldReader batchedStre
242242
}
243243

244244
r.m.WithLock(func() {
245-
r.streamErr = nil
245+
r.streamErr = err
246246
if err == nil {
247247
r.streamVal = newStream
248248
}

internal/topic/topicreaderinternal/stream_reconnector_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,22 @@ func TestTopicReaderReconnectorFireReconnectOnRetryableError(t *testing.T) {
341341
})
342342
}
343343

344+
func TestTopicReaderReconnectorReconnectWithError(t *testing.T) {
345+
ctx := context.Background()
346+
testErr := errors.New("test")
347+
reconnector := &readerReconnector{
348+
connectTimeout: infiniteTimeout,
349+
readerConnect: func(ctx context.Context) (batchedStreamReader, error) {
350+
return nil, testErr
351+
},
352+
streamErr: errors.New("start-error"),
353+
}
354+
reconnector.initChannelsAndClock()
355+
err := reconnector.reconnect(ctx, nil)
356+
require.ErrorIs(t, err, testErr)
357+
require.ErrorIs(t, reconnector.streamErr, testErr)
358+
}
359+
344360
type readerConnectFuncAnswer struct {
345361
callback readerConnectFunc
346362
stream batchedStreamReader

topic/reader_e2e_test.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ import (
2727
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2828
)
2929

30+
const (
31+
consumerName = "test-consumer"
32+
)
33+
3034
func TestReadMessages(t *testing.T) {
3135
ctx := testCtx(t)
3236

@@ -256,21 +260,21 @@ func createCDCFeed(ctx context.Context, t *testing.T, db ydb.Connection) {
256260
})
257261
require.NoError(t, err)
258262

259-
topicPath := db.Name() + "/test/feed"
263+
topicPath := testCDCFeedName(db)
260264

261265
require.NoError(t, err)
262266

263267
err = db.Topic().Alter(
264268
ctx,
265269
topicPath,
266-
topicoptions.AlterWithAddConsumers(topictypes.Consumer{Name: "test"}),
270+
topicoptions.AlterWithAddConsumers(topictypes.Consumer{Name: consumerName}),
267271
)
268272
require.NoError(t, err)
269273
}
270274

271275
func createFeedReader(t *testing.T, db ydb.Connection, opts ...topicoptions.ReaderOption) *topicreader.Reader {
272-
topicPath := db.Name() + "/test/feed"
273-
reader, err := db.Topic().StartReader("test", []topicoptions.ReadSelector{
276+
topicPath := testCDCFeedName(db)
277+
reader, err := db.Topic().StartReader(consumerName, []topicoptions.ReadSelector{
274278
{
275279
Path: topicPath,
276280
},
@@ -316,3 +320,7 @@ func testCtx(t testing.TB) context.Context {
316320
pprof.SetGoroutineLabels(pprof.WithLabels(ctx, pprof.Labels("test", t.Name())))
317321
return ctx
318322
}
323+
324+
func testCDCFeedName(db ydb.Connection) string {
325+
return db.Name() + "/test/feed"
326+
}

0 commit comments

Comments
 (0)