@@ -3,6 +3,7 @@ package testutils
33import (
44 "bufio"
55 "context"
6+ "database/sql"
67 "fmt"
78 "io"
89 "os"
@@ -23,6 +24,8 @@ import (
2324 . "github.com/onsi/ginkgo/v2"
2425 . "github.com/onsi/gomega"
2526
27+ ydb "github.com/ydb-platform/ydb-go-sdk/v3"
28+ "github.com/ydb-platform/ydb-go-sdk/v3/retry"
2629 v1alpha1 "github.com/ydb-platform/ydb-kubernetes-operator/api/v1alpha1"
2730 . "github.com/ydb-platform/ydb-kubernetes-operator/internal/controllers/constants"
2831 testobjects "github.com/ydb-platform/ydb-kubernetes-operator/tests/test-k8s-objects"
@@ -34,11 +37,17 @@ const (
3437 Interval = time .Second * 2
3538 YdbOperatorRemoteChart = "ydb/ydb-operator"
3639 YdbOperatorReleaseName = "ydb-operator"
40+ TestTablePath = "testfolder/testtable"
3741)
3842
3943var (
4044 pathToHelmValuesInLocalInstall = filepath .Join (".." , "cfg" , "operator-local-values.yaml" )
4145 pathToHelmValuesInRemoteInstall = filepath .Join (".." , "cfg" , "operator-values.yaml" )
46+
47+ createTableQuery = fmt .Sprintf ("CREATE TABLE `%s` (testColumnA Utf8, testColumnB Utf8, PRIMARY KEY (testColumnA));" , TestTablePath )
48+ insertQuery = fmt .Sprintf ("INSERT INTO `%s` (testColumnA, testColumnB) VALUES ('valueA', 'valueB');" , TestTablePath )
49+ selectQuery = fmt .Sprintf ("SELECT testColumnA, testColumnB FROM `%s`;" , TestTablePath )
50+ dropTableQuery = fmt .Sprintf ("DROP TABLE `%s`;" , TestTablePath )
4251)
4352
4453func InstallLocalOperatorWithHelm (namespace string ) {
@@ -203,8 +212,6 @@ func BringYdbCliToPod(podName, podNamespace string) {
203212}
204213
205214func ExecuteSimpleTableE2ETest (podName , podNamespace , storageEndpoint string , databasePath string ) {
206- tablePath := "testfolder/testtable"
207-
208215 tableCreatingInterval := time .Second * 10
209216
210217 Eventually (func (g Gomega ) {
@@ -217,7 +224,7 @@ func ExecuteSimpleTableE2ETest(podName, podNamespace, storageEndpoint string, da
217224 "-e" , storageEndpoint ,
218225 "yql" ,
219226 "-s" ,
220- fmt . Sprintf ( "CREATE TABLE `%s` (testColumnA Utf8, testColumnB Utf8, PRIMARY KEY (testColumnA));" , tablePath ) ,
227+ createTableQuery ,
221228 }
222229 output , _ := exec .Command ("kubectl" , args ... ).CombinedOutput ()
223230 fmt .Println (string (output ))
@@ -232,7 +239,7 @@ func ExecuteSimpleTableE2ETest(podName, podNamespace, storageEndpoint string, da
232239 "-e" , storageEndpoint ,
233240 "yql" ,
234241 "-s" ,
235- fmt . Sprintf ( "INSERT INTO `%s` (testColumnA, testColumnB) VALUES ('valueA', 'valueB');" , tablePath ) ,
242+ insertQuery ,
236243 }
237244 output , err := exec .Command ("kubectl" , argsInsert ... ).CombinedOutput ()
238245 Expect (err ).ShouldNot (HaveOccurred (), string (output ))
@@ -247,7 +254,7 @@ func ExecuteSimpleTableE2ETest(podName, podNamespace, storageEndpoint string, da
247254 "yql" ,
248255 "--format" , "csv" ,
249256 "-s" ,
250- fmt . Sprintf ( "SELECT * FROM `%s`;" , tablePath ) ,
257+ selectQuery ,
251258 }
252259 output , err = exec .Command ("kubectl" , argsSelect ... ).CombinedOutput ()
253260 Expect (err ).ShouldNot (HaveOccurred (), string (output ))
@@ -262,12 +269,74 @@ func ExecuteSimpleTableE2ETest(podName, podNamespace, storageEndpoint string, da
262269 "-e" , storageEndpoint ,
263270 "yql" ,
264271 "-s" ,
265- fmt . Sprintf ( "DROP TABLE `%s`;" , tablePath ) ,
272+ dropTableQuery ,
266273 }
267274 output , err = exec .Command ("kubectl" , argsDrop ... ).CombinedOutput ()
268275 Expect (err ).ShouldNot (HaveOccurred (), string (output ))
269276}
270277
278+ func ExecuteSimpleTableE2ETestWithSDK (databasePath string ) {
279+ ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
280+ defer cancel ()
281+
282+ cc , err := ydb .Open (ctx , fmt .Sprintf ("grpc://localhost:2135/%s" , databasePath ))
283+ Expect (err ).ShouldNot (HaveOccurred (), string (err .Error ()))
284+ defer func () { _ = cc .Close (ctx ) }()
285+
286+ c , err := ydb .Connector (cc ,
287+ ydb .WithAutoDeclare (),
288+ ydb .WithTablePathPrefix (TestTablePath ),
289+ )
290+ Expect (err ).ShouldNot (HaveOccurred ())
291+ defer func () { _ = c .Close () }()
292+
293+ db := sql .OpenDB (c )
294+ defer func () { _ = db .Close () }()
295+
296+ err = retry .Do (ctx , db , func (ctx context.Context , cc * sql.Conn ) error {
297+ _ , err = cc .ExecContext (ydb .WithQueryMode (ctx , ydb .SchemeQueryMode ), createTableQuery )
298+ if err != nil {
299+ return err
300+ }
301+ return nil
302+ }, retry .WithIdempotent (true ))
303+ Expect (err ).ShouldNot (HaveOccurred ())
304+
305+ err = retry .DoTx (ctx , db , func (ctx context.Context , tx * sql.Tx ) error {
306+ if _ , err = tx .ExecContext (ctx , insertQuery ); err != nil {
307+ return err
308+ }
309+ return nil
310+ }, retry .WithIdempotent (true ))
311+ Expect (err ).ShouldNot (HaveOccurred ())
312+
313+ var (
314+ testColumnA string
315+ testColumnB string
316+ )
317+ err = retry .Do (ctx , db , func (ctx context.Context , cc * sql.Conn ) (err error ) {
318+ row := cc .QueryRowContext (ctx , selectQuery )
319+ if err = row .Scan (& testColumnA , & testColumnB ); err != nil {
320+ return err
321+ }
322+
323+ return nil
324+ }, retry .WithIdempotent (true ))
325+ Expect (err ).ShouldNot (HaveOccurred ())
326+ Expect (testColumnA ).To (BeEquivalentTo ("valueA" ))
327+ Expect (testColumnB ).To (BeEquivalentTo ("valueB" ))
328+
329+ err = retry .Do (ctx , db , func (ctx context.Context , cc * sql.Conn ) error {
330+ _ , err = cc .ExecContext (ydb .WithQueryMode (ctx , ydb .SchemeQueryMode ), dropTableQuery )
331+ if err != nil {
332+ return err
333+ }
334+
335+ return nil
336+ }, retry .WithIdempotent (true ))
337+ Expect (err ).ShouldNot (HaveOccurred ())
338+ }
339+
271340func PortForward (
272341 ctx context.Context ,
273342 svcName , svcNamespace , serverName string ,
0 commit comments