Skip to content

Commit e509053

Browse files
authored
Fixed topic reader TLI
Fixed fail topic reader on TLI
2 parents 329010f + 5336b40 commit e509053

File tree

3 files changed

+110
-1
lines changed

3 files changed

+110
-1
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Fixed stop topic reader after TLI in transaction
2+
13
## v3.108.4
24
* Removed `experimental` from coordination API
35
* Added `WithReaderLogContext`, `WithWriterLogContext` options to topic reader/writer to supply log entries with user context fields

internal/topic/topicreaderinternal/stream_reader_impl.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ func (r *topicStreamReaderImpl) addOnTransactionCompletedHandler(
301301
if transactionResult == nil {
302302
topicreadercommon.BatchGetPartitionSession(batch).SetCommittedOffsetForward(commitRange.CommitOffsetEnd)
303303
} else {
304-
_ = r.CloseWithError(xcontext.ValueOnly(ctx), xerrors.WithStackTrace(xerrors.RetryableError(
304+
_ = r.CloseWithError(xcontext.ValueOnly(ctx), xerrors.WithStackTrace(xerrors.Retryable(
305305
fmt.Errorf("ydb: failed batch commit because transaction doesn't committed: %w", updateOffesetInTransactionErr),
306306
)))
307307
}

tests/integration/topic_transactions_test.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package integration
66
import (
77
"context"
88
"errors"
9+
"fmt"
910
"io"
1011
"os"
1112
"strconv"
@@ -69,6 +70,112 @@ func TestTopicReadInTransaction(t *testing.T) {
6970
}))
7071
}
7172

73+
func TestTopicReaderTLIIssue1797(t *testing.T) {
74+
scope := newScope(t)
75+
ctx := scope.Ctx
76+
db := scope.Driver()
77+
78+
tablePath := scope.TablePath()
79+
80+
writer := scope.TopicWriter()
81+
reader := scope.TopicReader()
82+
83+
scope.Require.NoError(writer.Write(ctx, topicwriter.Message{Data: strings.NewReader("1")}))
84+
85+
messageReaded := make(chan bool)
86+
valueInserted := make(chan bool)
87+
88+
go func() {
89+
<-messageReaded
90+
db.Query().Exec(ctx, fmt.Sprintf("INSERT INTO `%s` (id) VALUES (2)", tablePath))
91+
close(valueInserted)
92+
}()
93+
94+
attempts := 0
95+
scope.Require.NoError(db.Query().DoTx(ctx, func(ctx context.Context, tx query.TxActor) error {
96+
attempts++
97+
98+
row, err := tx.QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM `%s`", tablePath))
99+
scope.Require.NoError(err)
100+
var cnt uint64
101+
scope.Require.NoError(row.Scan(&cnt))
102+
fmt.Println("table items count", cnt)
103+
104+
_, err = reader.PopMessagesBatchTx(ctx, tx)
105+
scope.Require.NoError(err)
106+
if attempts == 1 {
107+
close(messageReaded)
108+
}
109+
110+
<-valueInserted
111+
err = tx.Exec(ctx, fmt.Sprintf("UPSERT INTO `%s` (id) VALUES (3)", tablePath))
112+
if err != nil {
113+
t.Log("UPSERT value 3 failed:", err)
114+
115+
return err
116+
}
117+
118+
return nil
119+
}))
120+
}
121+
122+
func TestTopicWriterTLI(t *testing.T) {
123+
scope := newScope(t)
124+
ctx := scope.Ctx
125+
db := scope.Driver()
126+
127+
tablePath := scope.TablePath()
128+
129+
messageWritten := make(chan bool)
130+
valueInserted := make(chan bool)
131+
132+
go func() {
133+
<-messageWritten
134+
db.Query().Exec(ctx, fmt.Sprintf("INSERT INTO `%s` (id) VALUES (2)", tablePath))
135+
close(valueInserted)
136+
}()
137+
138+
attempts := 0
139+
scope.Require.NoError(db.Query().DoTx(ctx, func(ctx context.Context, tx query.TxActor) error {
140+
attempts++
141+
142+
row, err := tx.QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM `%s`", tablePath))
143+
scope.Require.NoError(err)
144+
var cnt uint64
145+
scope.Require.NoError(row.Scan(&cnt))
146+
fmt.Println("table items count", cnt)
147+
148+
txWriter, err := scope.Driver().Topic().StartTransactionalWriter(tx, scope.TopicPath())
149+
scope.Require.NoError(err)
150+
151+
err = txWriter.Write(ctx, topicwriter.Message{Data: strings.NewReader("test")})
152+
scope.Require.NoError(err)
153+
if attempts == 1 {
154+
close(messageWritten)
155+
}
156+
157+
<-valueInserted
158+
err = tx.Exec(ctx, fmt.Sprintf("UPSERT INTO `%s` (id) VALUES (3)", tablePath))
159+
if err != nil {
160+
t.Log("UPSERT value 3 failed:", err)
161+
return err
162+
}
163+
164+
return nil
165+
}))
166+
167+
// Check retries
168+
scope.Require.Greater(attempts, 1)
169+
170+
// Verify the message was written
171+
batch, err := scope.TopicReader().ReadMessagesBatch(ctx)
172+
scope.Require.NoError(err)
173+
scope.Require.Len(batch.Messages, 1)
174+
content, err := io.ReadAll(batch.Messages[0])
175+
scope.Require.NoError(err)
176+
scope.Require.Equal("test", string(content))
177+
}
178+
72179
func TestWriteInTransaction(t *testing.T) {
73180
if os.Getenv("YDB_VERSION") != "nightly" && version.Lt(os.Getenv("YDB_VERSION"), "25.0") {
74181
t.Skip("require enables transactions for topics")

0 commit comments

Comments
 (0)