@@ -7,16 +7,24 @@ import (
77 "context"
88 "database/sql"
99 "errors"
10+ "fmt"
1011 "math/rand"
12+ "net/url"
1113 "os"
14+ "strconv"
1215 "testing"
1316 "time"
1417
18+ "github.com/stretchr/testify/require"
1519 "github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
1620
21+ "github.com/ydb-platform/ydb-go-sdk/v3"
1722 "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1823 "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql/badconn"
1924 "github.com/ydb-platform/ydb-go-sdk/v3/retry"
25+ "github.com/ydb-platform/ydb-go-sdk/v3/sugar"
26+ "github.com/ydb-platform/ydb-go-sdk/v3/table"
27+ "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
2028)
2129
2230func TestRegressionCloud109307 (t * testing.T ) {
@@ -71,3 +79,166 @@ func TestRegressionCloud109307(t *testing.T) {
7179 }
7280 }
7381}
82+
83+ func TestRegressionKikimr17104 (t * testing.T ) {
84+ tablePath := "/database/sql/kikimr/17104/stream_query"
85+ if dsn , has := os .LookupEnv ("YDB_CONNECTION_STRING" ); ! has {
86+ t .Errorf ("expected YDB_CONNECTION_STRING environment variable" )
87+ } else {
88+ u , err := url .Parse (dsn )
89+ require .NoError (t , err )
90+ tablePath = u .Path + tablePath
91+ }
92+
93+ var (
94+ upsertRowsCount = 100000
95+ upsertChecksum uint64
96+ )
97+
98+ ctx , cancel := context .WithTimeout (context .Background (), 42 * time .Second )
99+ defer cancel ()
100+
101+ t .Run ("data" , func (t * testing.T ) {
102+ t .Run ("prepare" , func (t * testing.T ) {
103+ var db * sql.DB
104+ defer func () {
105+ if db != nil {
106+ _ = db .Close ()
107+ }
108+ }()
109+ t .Run ("connect" , func (t * testing.T ) {
110+ var err error
111+ db , err = sql .Open ("ydb" , os .Getenv ("YDB_CONNECTION_STRING" ))
112+ require .NoError (t , err )
113+ })
114+ t .Run ("scheme" , func (t * testing.T ) {
115+ var cc ydb.Connection
116+ t .Run ("unwrap" , func (t * testing.T ) {
117+ var err error
118+ cc , err = ydb .Unwrap (db )
119+ require .NoError (t , err )
120+ })
121+ var tableExists bool
122+ t .Run ("check_exists" , func (t * testing.T ) {
123+ var err error
124+ tableExists , err = sugar .IsTableExists (ctx , cc .Scheme (), tablePath )
125+ require .NoError (t , err )
126+ })
127+ if tableExists {
128+ t .Run ("drop" , func (t * testing.T ) {
129+ err := retry .Do (ydb .WithQueryMode (ctx , ydb .SchemeQueryMode ), db ,
130+ func (ctx context.Context , cc * sql.Conn ) (err error ) {
131+ _ , err = cc .ExecContext (ctx ,
132+ fmt .Sprintf ("DROP TABLE `%s`" , tablePath ),
133+ )
134+ if err != nil {
135+ return err
136+ }
137+ return nil
138+ }, retry .WithDoRetryOptions (retry .WithIdempotent (true )),
139+ )
140+ require .NoError (t , err )
141+ })
142+ }
143+ t .Run ("create" , func (t * testing.T ) {
144+ err := retry .Do (ydb .WithQueryMode (ctx , ydb .SchemeQueryMode ), db ,
145+ func (ctx context.Context , cc * sql.Conn ) (err error ) {
146+ _ , err = cc .ExecContext (ctx ,
147+ fmt .Sprintf ("CREATE TABLE `%s` (val Int32, PRIMARY KEY (val))" , tablePath ),
148+ )
149+ if err != nil {
150+ return err
151+ }
152+ return nil
153+ }, retry .WithDoRetryOptions (retry .WithIdempotent (true )),
154+ )
155+ require .NoError (t , err )
156+ })
157+ })
158+ t .Run ("upsert" , func (t * testing.T ) {
159+ if v , ok := os .LookupEnv ("UPSERT_ROWS_COUNT" ); ok {
160+ var vv int
161+ vv , err := strconv .Atoi (v )
162+ require .NoError (t , err )
163+ upsertRowsCount = vv
164+ }
165+ // - upsert data
166+ fmt .Printf ("> preparing values to upsert...\n " )
167+ values := make ([]types.Value , 0 , upsertRowsCount )
168+ for i := 0 ; i < upsertRowsCount ; i ++ {
169+ upsertChecksum += uint64 (i )
170+ values = append (values ,
171+ types .StructValue (
172+ types .StructFieldValue ("val" , types .Int32Value (int32 (i ))),
173+ ),
174+ )
175+ }
176+ fmt .Printf ("> upsert data\n " )
177+ err := retry .Do (ydb .WithQueryMode (ctx , ydb .DataQueryMode ), db ,
178+ func (ctx context.Context , cc * sql.Conn ) (err error ) {
179+ values := table .NewQueryParameters (table .ValueParam ("$values" , types .ListValue (values ... )))
180+ declares , err := sugar .GenerateDeclareSection (values )
181+ require .NoError (t , err )
182+ _ , err = cc .ExecContext (ctx ,
183+ declares + fmt .Sprintf ("UPSERT INTO `%s` SELECT val FROM AS_TABLE($values);" , tablePath ),
184+ values ,
185+ )
186+ if err != nil {
187+ return err
188+ }
189+ return nil
190+ }, retry .WithDoRetryOptions (retry .WithIdempotent (true )),
191+ )
192+ require .NoError (t , err )
193+ })
194+ })
195+ t .Run ("scan" , func (t * testing.T ) {
196+ var db * sql.DB
197+ defer func () {
198+ if db != nil {
199+ _ = db .Close ()
200+ }
201+ }()
202+ t .Run ("connect" , func (t * testing.T ) {
203+ var err error
204+ cc , err := ydb .Open (ctx , os .Getenv ("YDB_CONNECTION_STRING" ))
205+ require .NoError (t , err )
206+ connector , err := ydb .Connector (cc , ydb .WithDefaultQueryMode (ydb .ScanQueryMode ))
207+ require .NoError (t , err )
208+ db = sql .OpenDB (connector )
209+ })
210+ t .Run ("query" , func (t * testing.T ) {
211+ var (
212+ rowsCount int
213+ checkSum uint64
214+ )
215+ err := retry .Do (ydb .WithQueryMode (ctx , ydb .ScanQueryMode ), db ,
216+ func (ctx context.Context , cc * sql.Conn ) (err error ) {
217+ var rows * sql.Rows
218+ rowsCount = 0
219+ checkSum = 0
220+ rows , err = cc .QueryContext (ctx , fmt .Sprintf ("SELECT val FROM `%s`" , tablePath ))
221+ if err != nil {
222+ return err
223+ }
224+ for rows .NextResultSet () {
225+ for rows .Next () {
226+ rowsCount ++
227+ var val uint64
228+ err = rows .Scan (& val )
229+ if err != nil {
230+ return err
231+ }
232+ checkSum += val
233+ }
234+ }
235+ return rows .Err ()
236+ }, retry .WithDoRetryOptions (retry .WithIdempotent (true )),
237+ )
238+ require .NoError (t , err )
239+ require .Equal (t , upsertRowsCount , rowsCount )
240+ require .Equal (t , upsertChecksum , checkSum )
241+ })
242+ })
243+ })
244+ }
0 commit comments