Skip to content

Commit aa40432

Browse files
authored
Merge pull request #1018 Fixed init info in topic writer, when autoseq num turned off.
2 parents fd8aa33 + f90055a commit aa40432

File tree

6 files changed

+63
-16
lines changed

6 files changed

+63
-16
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Fixed init info in topic writer, when autoseq num turned off.
2+
13
## v3.55.1
24
* Supported column name prefix `__discard_column_` for discard columns in result sets
35
* Made `StatusIds_SESSION_EXPIRED` retriable for idempotent operations

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ func (w *WriterReconnector) startWriteStream(ctx, streamCtx context.Context, att
428428
}
429429

430430
func (w *WriterReconnector) needReceiveLastSeqNo() bool {
431-
res := w.cfg.AutoSetSeqNo && !w.firstConnectionHandled.Load()
431+
res := !w.firstConnectionHandled.Load()
432432
return res
433433
}
434434

@@ -490,7 +490,7 @@ func (w *WriterReconnector) onWriterChange(writerStream *SingleStreamWriter) {
490490
defer close(w.firstInitResponseProcessedChan)
491491
isFirstInit = true
492492

493-
if w.cfg.AutoSetSeqNo {
493+
if writerStream.LastSeqNumRequested {
494494
w.lastSeqNo = writerStream.ReceivedLastSeqNum
495495
}
496496
})

internal/topic/topicwriterinternal/writer_reconnector_test.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -328,8 +328,9 @@ func TestWriterImpl_InitSession(t *testing.T) {
328328
sessionID := "test-session-id"
329329

330330
w.onWriterChange(&SingleStreamWriter{
331-
ReceivedLastSeqNum: lastSeqNo,
332-
SessionID: sessionID,
331+
ReceivedLastSeqNum: lastSeqNo,
332+
LastSeqNumRequested: true,
333+
SessionID: sessionID,
333334
})
334335

335336
require.Equal(t, sessionID, w.sessionID)
@@ -344,7 +345,8 @@ func TestWriterImpl_WaitInit(t *testing.T) {
344345
LastSeqNum: int64(123),
345346
}
346347
w.onWriterChange(&SingleStreamWriter{
347-
ReceivedLastSeqNum: expectedInitData.LastSeqNum,
348+
ReceivedLastSeqNum: expectedInitData.LastSeqNum,
349+
LastSeqNumRequested: true,
348350
})
349351

350352
initData, err := w.WaitInit(context.Background())
@@ -432,9 +434,15 @@ func TestWriterImpl_Reconnect(t *testing.T) {
432434
connectionError error
433435
}
434436

437+
isFirstConnection := true
435438
newStream := func(name string) *MockRawTopicWriterStream {
436439
strm := NewMockRawTopicWriterStream(mc)
437440
initReq := testCreateInitRequest(w)
441+
if isFirstConnection {
442+
isFirstConnection = false
443+
} else {
444+
initReq.GetLastSeqNo = false
445+
}
438446

439447
streamClosed := make(empty.Chan)
440448
strm.EXPECT().CloseSend().Do(func() {

internal/topic/topicwriterinternal/writer_single_stream.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ type SingleStreamWriterConfig struct {
2424
stream RawTopicWriterStream
2525
queue *messageQueue
2626
encodersMap *EncoderMap
27-
getAutoSeq bool
27+
getLastSeqNum bool
2828
reconnectorInstanceID string
2929
}
3030

@@ -33,25 +33,26 @@ func newSingleStreamWriterConfig(
3333
stream RawTopicWriterStream,
3434
queue *messageQueue,
3535
encodersMap *EncoderMap,
36-
getAutoSeq bool,
36+
getLastSeqNum bool,
3737
reconnectorID string,
3838
) SingleStreamWriterConfig {
3939
return SingleStreamWriterConfig{
4040
WritersCommonConfig: common,
4141
stream: stream,
4242
queue: queue,
4343
encodersMap: encodersMap,
44-
getAutoSeq: getAutoSeq,
44+
getLastSeqNum: getLastSeqNum,
4545
reconnectorInstanceID: reconnectorID,
4646
}
4747
}
4848

4949
type SingleStreamWriter struct {
50-
ReceivedLastSeqNum int64
51-
SessionID string
52-
PartitionID int64
53-
CodecsFromServer rawtopiccommon.SupportedCodecs
54-
Encoder EncoderSelector
50+
ReceivedLastSeqNum int64
51+
LastSeqNumRequested bool
52+
SessionID string
53+
PartitionID int64
54+
CodecsFromServer rawtopiccommon.SupportedCodecs
55+
Encoder EncoderSelector
5556

5657
cfg SingleStreamWriterConfig
5758
allowedCodecs rawtopiccommon.SupportedCodecs
@@ -152,6 +153,7 @@ func (w *SingleStreamWriter) initStream() (err error) {
152153
)
153154

154155
w.SessionID = result.SessionID
156+
w.LastSeqNumRequested = req.GetLastSeqNo
155157
w.ReceivedLastSeqNum = result.LastSeqNo
156158
w.PartitionID = result.PartitionID
157159
w.CodecsFromServer = result.SupportedCodecs
@@ -164,7 +166,7 @@ func (w *SingleStreamWriter) createInitRequest() rawtopicwriter.InitRequest {
164166
ProducerID: w.cfg.producerID,
165167
WriteSessionMeta: w.cfg.writerMeta,
166168
Partitioning: w.cfg.defaultPartitioning,
167-
GetLastSeqNo: w.cfg.getAutoSeq,
169+
GetLastSeqNo: w.cfg.getLastSeqNum,
168170
}
169171
}
170172

internal/topic/topicwriterinternal/writer_single_stream_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ func TestWriterImpl_CreateInitMessage(t *testing.T) {
2020
defaultPartitioning: rawtopicwriter.NewPartitioningPartitionID(5),
2121
compressorCount: 1,
2222
},
23-
getAutoSeq: true,
23+
getLastSeqNum: true,
2424
}
2525
w := newSingleStreamWriterStopped(ctx, cfg)
2626
expected := rawtopicwriter.InitRequest{
@@ -36,7 +36,7 @@ func TestWriterImpl_CreateInitMessage(t *testing.T) {
3636
t.Run("WithoutGetLastSeq", func(t *testing.T) {
3737
ctx := xtest.Context(t)
3838
w := newSingleStreamWriterStopped(ctx,
39-
SingleStreamWriterConfig{getAutoSeq: false},
39+
SingleStreamWriterConfig{getLastSeqNum: false},
4040
)
4141
require.False(t, w.createInitRequest().GetLastSeqNo)
4242
})
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
//go:build integration
2+
// +build integration
3+
4+
package integration
5+
6+
import (
7+
"strings"
8+
"testing"
9+
10+
"github.com/stretchr/testify/require"
11+
12+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
13+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter"
14+
)
15+
16+
func TestRegressionIssue1011_WriteInitInfoLastSeqNum(t *testing.T) {
17+
scope := newScope(t)
18+
w1 := scope.TopicWriter()
19+
err := w1.Write(scope.Ctx, topicwriter.Message{
20+
Data: strings.NewReader("123"),
21+
})
22+
require.NoError(t, err)
23+
require.NoError(t, w1.Close(scope.Ctx))
24+
25+
// Check
26+
w2, err := scope.Driver().Topic().StartWriter(
27+
scope.TopicPath(),
28+
topicoptions.WithWriterProducerID(scope.TopicWriterProducerID()),
29+
topicoptions.WithWriterSetAutoSeqNo(false),
30+
)
31+
require.NoError(t, err)
32+
33+
info, err := w2.WaitInitInfo(scope.Ctx)
34+
require.Equal(t, int64(1), info.LastSeqNum)
35+
}

0 commit comments

Comments
 (0)