@@ -6,6 +6,7 @@ package integration
66import (
77 "context"
88 "errors"
9+ "fmt"
910 "io"
1011 "os"
1112 "strconv"
@@ -69,6 +70,112 @@ func TestTopicReadInTransaction(t *testing.T) {
6970 }))
7071}
7172
73+ func TestTopicReaderTLIIssue1797 (t * testing.T ) {
74+ scope := newScope (t )
75+ ctx := scope .Ctx
76+ db := scope .Driver ()
77+
78+ tablePath := scope .TablePath ()
79+
80+ writer := scope .TopicWriter ()
81+ reader := scope .TopicReader ()
82+
83+ scope .Require .NoError (writer .Write (ctx , topicwriter.Message {Data : strings .NewReader ("1" )}))
84+
85+ messageReaded := make (chan bool )
86+ valueInserted := make (chan bool )
87+
88+ go func () {
89+ <- messageReaded
90+ db .Query ().Exec (ctx , fmt .Sprintf ("INSERT INTO `%s` (id) VALUES (2)" , tablePath ))
91+ close (valueInserted )
92+ }()
93+
94+ attempts := 0
95+ scope .Require .NoError (db .Query ().DoTx (ctx , func (ctx context.Context , tx query.TxActor ) error {
96+ attempts ++
97+
98+ row , err := tx .QueryRow (ctx , fmt .Sprintf ("SELECT COUNT(*) FROM `%s`" , tablePath ))
99+ scope .Require .NoError (err )
100+ var cnt uint64
101+ scope .Require .NoError (row .Scan (& cnt ))
102+ fmt .Println ("table items count" , cnt )
103+
104+ _ , err = reader .PopMessagesBatchTx (ctx , tx )
105+ scope .Require .NoError (err )
106+ if attempts == 1 {
107+ close (messageReaded )
108+ }
109+
110+ <- valueInserted
111+ err = tx .Exec (ctx , fmt .Sprintf ("UPSERT INTO `%s` (id) VALUES (3)" , tablePath ))
112+ if err != nil {
113+ t .Log ("UPSERT value 3 failed:" , err )
114+
115+ return err
116+ }
117+
118+ return nil
119+ }))
120+ }
121+
122+ func TestTopicWriterTLI (t * testing.T ) {
123+ scope := newScope (t )
124+ ctx := scope .Ctx
125+ db := scope .Driver ()
126+
127+ tablePath := scope .TablePath ()
128+
129+ messageWritten := make (chan bool )
130+ valueInserted := make (chan bool )
131+
132+ go func () {
133+ <- messageWritten
134+ db .Query ().Exec (ctx , fmt .Sprintf ("INSERT INTO `%s` (id) VALUES (2)" , tablePath ))
135+ close (valueInserted )
136+ }()
137+
138+ attempts := 0
139+ scope .Require .NoError (db .Query ().DoTx (ctx , func (ctx context.Context , tx query.TxActor ) error {
140+ attempts ++
141+
142+ row , err := tx .QueryRow (ctx , fmt .Sprintf ("SELECT COUNT(*) FROM `%s`" , tablePath ))
143+ scope .Require .NoError (err )
144+ var cnt uint64
145+ scope .Require .NoError (row .Scan (& cnt ))
146+ fmt .Println ("table items count" , cnt )
147+
148+ txWriter , err := scope .Driver ().Topic ().StartTransactionalWriter (tx , scope .TopicPath ())
149+ scope .Require .NoError (err )
150+
151+ err = txWriter .Write (ctx , topicwriter.Message {Data : strings .NewReader ("test" )})
152+ scope .Require .NoError (err )
153+ if attempts == 1 {
154+ close (messageWritten )
155+ }
156+
157+ <- valueInserted
158+ err = tx .Exec (ctx , fmt .Sprintf ("UPSERT INTO `%s` (id) VALUES (3)" , tablePath ))
159+ if err != nil {
160+ t .Log ("UPSERT value 3 failed:" , err )
161+ return err
162+ }
163+
164+ return nil
165+ }))
166+
167+ // Check retries
168+ scope .Require .Greater (attempts , 1 )
169+
170+ // Verify the message was written
171+ batch , err := scope .TopicReader ().ReadMessagesBatch (ctx )
172+ scope .Require .NoError (err )
173+ scope .Require .Len (batch .Messages , 1 )
174+ content , err := io .ReadAll (batch .Messages [0 ])
175+ scope .Require .NoError (err )
176+ scope .Require .Equal ("test" , string (content ))
177+ }
178+
72179func TestWriteInTransaction (t * testing.T ) {
73180 if os .Getenv ("YDB_VERSION" ) != "nightly" && version .Lt (os .Getenv ("YDB_VERSION" ), "25.0" ) {
74181 t .Skip ("require enables transactions for topics" )
0 commit comments