Skip to content

Commit 1bc78be

Browse files
authored
Merge pull request #570 from ydb-platform/Kikimr17104
* Added wrapping of errors from unary and stream results
2 parents 59dde88 + 9799559 commit 1bc78be

File tree

7 files changed

+225
-3
lines changed

7 files changed

+225
-3
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
* Refactored of applying grpc dial options with defaults
66
* Added `trace.Driver.{OnBalancerDialEntrypoint,OnBalancerClusterDiscoveryAttempt}` trace events
77
* Fixed compilation of package `internal/xresolver` with `google.golang.org/[email protected]`
8+
* Fixed returning `io.EOF` on `rows.Next` and `rows.NextResultSet`
9+
* Added wrapping of errors from unary and stream results
10+
* Added error throw on `database/sql.Conn.BeginTx()`, `*sql.Tx.ExecContext` and `*sql.Tx.QueryContext` if query mode is not `ydb.DataQueryMode`
11+
* Added test for `database/sql` scan-query
812

913
## v3.42.8
1014
* Fixed `internal/scheme/helpers/IsDirectoryExists(..)` recursive bug

internal/table/scanner/result.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,31 @@ type streamResult struct {
3333
close func(error) error
3434
}
3535

36+
// Err returns error caused Scanner to be broken.
37+
func (r *streamResult) Err() error {
38+
err := r.scanner.Err()
39+
if err != nil {
40+
return xerrors.WithStackTrace(err)
41+
}
42+
return nil
43+
}
44+
3645
type unaryResult struct {
3746
baseResult
3847

3948
sets []*Ydb.ResultSet
4049
nextSet int
4150
}
4251

52+
// Err returns error caused Scanner to be broken.
53+
func (r *unaryResult) Err() error {
54+
err := r.scanner.Err()
55+
if err != nil {
56+
return xerrors.WithStackTrace(err)
57+
}
58+
return nil
59+
}
60+
4361
// Close closes the result, preventing further iteration.
4462
func (r *unaryResult) Close() error {
4563
if atomic.CompareAndSwapUint32(&r.closed, 0, 1) {

internal/xsql/conn.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,10 @@ func (c *conn) BeginTx(ctx context.Context, txOptions driver.TxOptions) (_ drive
342342
defer func() {
343343
onDone(transaction, err)
344344
}()
345+
m := queryModeFromContext(ctx, c.defaultQueryMode)
346+
if m != DataQueryMode {
347+
return nil, badconn.Map(xerrors.WithStackTrace(fmt.Errorf("wrong query mode: %s", m.String())))
348+
}
345349
if !c.isReady() {
346350
return nil, badconn.Map(xerrors.WithStackTrace(errNotReadyConn))
347351
}

internal/xsql/rows.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,32 @@ func (r *rows) ColumnTypeDatabaseTypeName(index int) string {
6767
return yqlTypes[index]
6868
}
6969

70-
func (r *rows) NextResultSet() error {
70+
func (r *rows) NextResultSet() (err error) {
71+
defer func() {
72+
if err != nil && xerrors.Is(err, io.EOF) {
73+
// database/sql checks io.EOF with "==", not errors.Is(err, io.EOF)
74+
err = io.EOF
75+
}
76+
}()
7177
r.nextSet.Do(func() {})
72-
return r.result.NextResultSetErr(context.Background())
78+
err = r.result.NextResultSetErr(context.Background())
79+
if err != nil {
80+
return badconn.Map(xerrors.WithStackTrace(err))
81+
}
82+
return nil
7383
}
7484

7585
func (r *rows) HasNextResultSet() bool {
7686
return r.result.HasNextResultSet()
7787
}
7888

7989
func (r *rows) Next(dst []driver.Value) (err error) {
90+
defer func() {
91+
if err != nil && xerrors.Is(err, io.EOF) {
92+
// database/sql checks io.EOF with "==", not errors.Is(err, io.EOF)
93+
err = io.EOF
94+
}
95+
}()
8096
r.nextSet.Do(func() {
8197
err = r.result.NextResultSetErr(context.Background())
8298
})

internal/xsql/tx.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package xsql
33
import (
44
"context"
55
"database/sql/driver"
6+
"fmt"
67

78
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
89
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
@@ -70,6 +71,10 @@ func (tx *tx) QueryContext(ctx context.Context, query string, args []driver.Name
7071
defer func() {
7172
onDone(err)
7273
}()
74+
m := queryModeFromContext(ctx, tx.conn.defaultQueryMode)
75+
if m != DataQueryMode {
76+
return nil, badconn.Map(xerrors.WithStackTrace(fmt.Errorf("wrong query mode: %s", m.String())))
77+
}
7378
var res result.Result
7479
res, err = tx.tx.Execute(ctx,
7580
query,
@@ -93,6 +98,10 @@ func (tx *tx) ExecContext(ctx context.Context, query string, args []driver.Named
9398
defer func() {
9499
onDone(err)
95100
}()
101+
m := queryModeFromContext(ctx, tx.conn.defaultQueryMode)
102+
if m != DataQueryMode {
103+
return nil, badconn.Map(xerrors.WithStackTrace(fmt.Errorf("wrong query mode: %s", m.String())))
104+
}
96105
_, err = tx.tx.Execute(ctx,
97106
query,
98107
toQueryParams(args),

tests/database_sql_regression_test.go

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,24 @@ import (
77
"context"
88
"database/sql"
99
"errors"
10+
"fmt"
1011
"math/rand"
12+
"net/url"
1113
"os"
14+
"strconv"
1215
"testing"
1316
"time"
1417

18+
"github.com/stretchr/testify/require"
1519
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
1620

21+
"github.com/ydb-platform/ydb-go-sdk/v3"
1722
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1823
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql/badconn"
1924
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
25+
"github.com/ydb-platform/ydb-go-sdk/v3/sugar"
26+
"github.com/ydb-platform/ydb-go-sdk/v3/table"
27+
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
2028
)
2129

2230
func TestRegressionCloud109307(t *testing.T) {
@@ -71,3 +79,166 @@ func TestRegressionCloud109307(t *testing.T) {
7179
}
7280
}
7381
}
82+
83+
func TestRegressionKikimr17104(t *testing.T) {
84+
tablePath := "/database/sql/kikimr/17104/stream_query"
85+
if dsn, has := os.LookupEnv("YDB_CONNECTION_STRING"); !has {
86+
t.Errorf("expected YDB_CONNECTION_STRING environment variable")
87+
} else {
88+
u, err := url.Parse(dsn)
89+
require.NoError(t, err)
90+
tablePath = u.Path + tablePath
91+
}
92+
93+
var (
94+
upsertRowsCount = 100000
95+
upsertChecksum uint64
96+
)
97+
98+
ctx, cancel := context.WithTimeout(context.Background(), 42*time.Second)
99+
defer cancel()
100+
101+
t.Run("data", func(t *testing.T) {
102+
t.Run("prepare", func(t *testing.T) {
103+
var db *sql.DB
104+
defer func() {
105+
if db != nil {
106+
_ = db.Close()
107+
}
108+
}()
109+
t.Run("connect", func(t *testing.T) {
110+
var err error
111+
db, err = sql.Open("ydb", os.Getenv("YDB_CONNECTION_STRING"))
112+
require.NoError(t, err)
113+
})
114+
t.Run("scheme", func(t *testing.T) {
115+
var cc ydb.Connection
116+
t.Run("unwrap", func(t *testing.T) {
117+
var err error
118+
cc, err = ydb.Unwrap(db)
119+
require.NoError(t, err)
120+
})
121+
var tableExists bool
122+
t.Run("check_exists", func(t *testing.T) {
123+
var err error
124+
tableExists, err = sugar.IsTableExists(ctx, cc.Scheme(), tablePath)
125+
require.NoError(t, err)
126+
})
127+
if tableExists {
128+
t.Run("drop", func(t *testing.T) {
129+
err := retry.Do(ydb.WithQueryMode(ctx, ydb.SchemeQueryMode), db,
130+
func(ctx context.Context, cc *sql.Conn) (err error) {
131+
_, err = cc.ExecContext(ctx,
132+
fmt.Sprintf("DROP TABLE `%s`", tablePath),
133+
)
134+
if err != nil {
135+
return err
136+
}
137+
return nil
138+
}, retry.WithDoRetryOptions(retry.WithIdempotent(true)),
139+
)
140+
require.NoError(t, err)
141+
})
142+
}
143+
t.Run("create", func(t *testing.T) {
144+
err := retry.Do(ydb.WithQueryMode(ctx, ydb.SchemeQueryMode), db,
145+
func(ctx context.Context, cc *sql.Conn) (err error) {
146+
_, err = cc.ExecContext(ctx,
147+
fmt.Sprintf("CREATE TABLE `%s` (val Int32, PRIMARY KEY (val))", tablePath),
148+
)
149+
if err != nil {
150+
return err
151+
}
152+
return nil
153+
}, retry.WithDoRetryOptions(retry.WithIdempotent(true)),
154+
)
155+
require.NoError(t, err)
156+
})
157+
})
158+
t.Run("upsert", func(t *testing.T) {
159+
if v, ok := os.LookupEnv("UPSERT_ROWS_COUNT"); ok {
160+
var vv int
161+
vv, err := strconv.Atoi(v)
162+
require.NoError(t, err)
163+
upsertRowsCount = vv
164+
}
165+
// - upsert data
166+
fmt.Printf("> preparing values to upsert...\n")
167+
values := make([]types.Value, 0, upsertRowsCount)
168+
for i := 0; i < upsertRowsCount; i++ {
169+
upsertChecksum += uint64(i)
170+
values = append(values,
171+
types.StructValue(
172+
types.StructFieldValue("val", types.Int32Value(int32(i))),
173+
),
174+
)
175+
}
176+
fmt.Printf("> upsert data\n")
177+
err := retry.Do(ydb.WithQueryMode(ctx, ydb.DataQueryMode), db,
178+
func(ctx context.Context, cc *sql.Conn) (err error) {
179+
values := table.NewQueryParameters(table.ValueParam("$values", types.ListValue(values...)))
180+
declares, err := sugar.GenerateDeclareSection(values)
181+
require.NoError(t, err)
182+
_, err = cc.ExecContext(ctx,
183+
declares+fmt.Sprintf("UPSERT INTO `%s` SELECT val FROM AS_TABLE($values);", tablePath),
184+
values,
185+
)
186+
if err != nil {
187+
return err
188+
}
189+
return nil
190+
}, retry.WithDoRetryOptions(retry.WithIdempotent(true)),
191+
)
192+
require.NoError(t, err)
193+
})
194+
})
195+
t.Run("scan", func(t *testing.T) {
196+
var db *sql.DB
197+
defer func() {
198+
if db != nil {
199+
_ = db.Close()
200+
}
201+
}()
202+
t.Run("connect", func(t *testing.T) {
203+
var err error
204+
cc, err := ydb.Open(ctx, os.Getenv("YDB_CONNECTION_STRING"))
205+
require.NoError(t, err)
206+
connector, err := ydb.Connector(cc, ydb.WithDefaultQueryMode(ydb.ScanQueryMode))
207+
require.NoError(t, err)
208+
db = sql.OpenDB(connector)
209+
})
210+
t.Run("query", func(t *testing.T) {
211+
var (
212+
rowsCount int
213+
checkSum uint64
214+
)
215+
err := retry.Do(ydb.WithQueryMode(ctx, ydb.ScanQueryMode), db,
216+
func(ctx context.Context, cc *sql.Conn) (err error) {
217+
var rows *sql.Rows
218+
rowsCount = 0
219+
checkSum = 0
220+
rows, err = cc.QueryContext(ctx, fmt.Sprintf("SELECT val FROM `%s`", tablePath))
221+
if err != nil {
222+
return err
223+
}
224+
for rows.NextResultSet() {
225+
for rows.Next() {
226+
rowsCount++
227+
var val uint64
228+
err = rows.Scan(&val)
229+
if err != nil {
230+
return err
231+
}
232+
checkSum += val
233+
}
234+
}
235+
return rows.Err()
236+
}, retry.WithDoRetryOptions(retry.WithIdempotent(true)),
237+
)
238+
require.NoError(t, err)
239+
require.Equal(t, upsertRowsCount, rowsCount)
240+
require.Equal(t, upsertChecksum, checkSum)
241+
})
242+
})
243+
})
244+
}

tests/table_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,7 @@ func testTable(t testing.TB) {
646646
); err != nil {
647647
t.Fatalf("create table failed: %v\n", err)
648648
}
649-
fmt.Printf("> table stream_query openSessions\n")
649+
fmt.Printf("> table stream_query upsert data\n")
650650
var (
651651
upsertRowsCount = 100000
652652
sum uint64

0 commit comments

Comments
 (0)