Skip to content

Commit a4bdae2

Browse files
committed
added test for Operation error ABORTED (Transaction locks invalidated)
1 parent 7dd9ec4 commit a4bdae2

File tree

9 files changed

+204
-77
lines changed

9 files changed

+204
-77
lines changed

.github/workflows/tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ jobs:
116116
name: connection
117117
- name: Test database/sql
118118
run: go test -race -coverpkg=./... -coverprofile database_sql.txt -covermode atomic ./sql_e2e_test.go
119-
- name: Upload coverage to Codecov
119+
- name: Upload database/sql coverage report to Codecov
120120
uses: codecov/codecov-action@v2
121121
with:
122122
file: ./database_sql.txt

internal/xsql/conn.go

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ func (c *conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, e
119119
}
120120
return &stmt{
121121
conn: c,
122+
tx: c.currentTx,
122123
params: internal.Params(s),
123124
query: query,
124125
}, nil
@@ -153,12 +154,22 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
153154
return nil, errClosedConn
154155
}
155156
m := queryModeFromContext(ctx, c.defaultQueryMode)
156-
if c.currentTx != nil && m == DataQueryMode {
157+
if c.currentTx != nil {
158+
if m != DataQueryMode {
159+
return nil, xerrors.WithStackTrace(
160+
fmt.Errorf("query mode `%s` not supported with transaction", m.String()),
161+
)
162+
}
157163
return c.currentTx.ExecContext(ctx, query, args)
158164
}
159165
switch m {
160166
case DataQueryMode:
161-
_, res, err := c.session.Execute(ctx, txControl(ctx, c.defaultTxControl), query, toQueryParams(args))
167+
_, res, err := c.session.Execute(ctx,
168+
txControl(ctx, c.defaultTxControl),
169+
query,
170+
toQueryParams(args),
171+
dataQueryOptions(ctx)...,
172+
)
162173
if err != nil {
163174
return nil, c.checkClosed(err)
164175
}
@@ -167,7 +178,7 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
167178
}
168179
return c, nil
169180
case SchemeQueryMode:
170-
err := c.session.ExecuteSchemeQuery(ctx, query, toSchemeOptions(args)...)
181+
err := c.session.ExecuteSchemeQuery(ctx, query)
171182
if err != nil {
172183
return nil, c.checkClosed(err)
173184
}
@@ -188,12 +199,22 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
188199
return nil, errClosedConn
189200
}
190201
m := queryModeFromContext(ctx, c.defaultQueryMode)
191-
if c.currentTx != nil && m == DataQueryMode {
202+
if c.currentTx != nil {
203+
if m != DataQueryMode {
204+
return nil, xerrors.WithStackTrace(
205+
fmt.Errorf("query mode `%s` not supported with transaction", m.String()),
206+
)
207+
}
192208
return c.currentTx.QueryContext(ctx, query, args)
193209
}
194210
switch m {
195211
case DataQueryMode:
196-
_, res, err := c.session.Execute(ctx, txControl(ctx, c.defaultTxControl), query, toQueryParams(args))
212+
_, res, err := c.session.Execute(ctx,
213+
txControl(ctx, c.defaultTxControl),
214+
query,
215+
toQueryParams(args),
216+
dataQueryOptions(ctx)...,
217+
)
197218
if err != nil {
198219
return nil, c.checkClosed(err)
199220
}
@@ -204,7 +225,11 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
204225
result: res,
205226
}, nil
206227
case ScanQueryMode:
207-
res, err := c.session.StreamExecuteScanQuery(ctx, query, toQueryParams(args), scanQueryOptions(ctx)...)
228+
res, err := c.session.StreamExecuteScanQuery(ctx,
229+
query,
230+
toQueryParams(args),
231+
scanQueryOptions(ctx)...,
232+
)
208233
if err != nil {
209234
return nil, c.checkClosed(err)
210235
}

internal/xsql/context.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,14 @@ func txControl(ctx context.Context, defaultTxControl *table.TransactionControl)
3838
return defaultTxControl
3939
}
4040

41-
func WithScanQueryOptions(ctx context.Context, opts []options.ExecuteScanQueryOption) context.Context {
42-
return context.WithValue(ctx, ctxScanQueryOptionsKey{}, append(scanQueryOptions(ctx), opts...))
41+
func WithScanQueryOptions(ctx context.Context, opts ...options.ExecuteScanQueryOption) context.Context {
42+
return context.WithValue(ctx,
43+
ctxScanQueryOptionsKey{},
44+
append(
45+
append([]options.ExecuteScanQueryOption{}, scanQueryOptions(ctx)...),
46+
opts...,
47+
),
48+
)
4349
}
4450

4551
func scanQueryOptions(ctx context.Context) []options.ExecuteScanQueryOption {
@@ -49,8 +55,14 @@ func scanQueryOptions(ctx context.Context) []options.ExecuteScanQueryOption {
4955
return nil
5056
}
5157

52-
func WithDataQueryOptions(ctx context.Context, opts []options.ExecuteDataQueryOption) context.Context {
53-
return context.WithValue(ctx, ctxDataQueryOptionsKey{}, append(dataQueryOptions(ctx), opts...))
58+
func WithDataQueryOptions(ctx context.Context, opts ...options.ExecuteDataQueryOption) context.Context {
59+
return context.WithValue(ctx,
60+
ctxDataQueryOptionsKey{},
61+
append(
62+
append([]options.ExecuteDataQueryOption{}, dataQueryOptions(ctx)...),
63+
opts...,
64+
),
65+
)
5466
}
5567

5668
func dataQueryOptions(ctx context.Context) []options.ExecuteDataQueryOption {
@@ -59,3 +71,7 @@ func dataQueryOptions(ctx context.Context) []options.ExecuteDataQueryOption {
5971
}
6072
return nil
6173
}
74+
75+
func withKeepInCache(ctx context.Context) context.Context {
76+
return WithDataQueryOptions(ctx, options.WithKeepInCache(true))
77+
}

internal/xsql/stmt.go

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,15 @@ import (
77

88
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
99

10-
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
10+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1111
)
1212

1313
type stmt struct {
1414
nopResult
1515
namedValueChecker
1616

1717
conn *conn
18+
tx *tx
1819
params map[string]*Ydb.Type
1920
query string
2021
}
@@ -31,19 +32,22 @@ func (s *stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driv
3132
if s.conn.isClosed() {
3233
return nil, errClosedConn
3334
}
34-
switch m := queryModeFromContext(ctx, s.conn.defaultQueryMode); m {
35+
m := queryModeFromContext(ctx, s.conn.defaultQueryMode)
36+
if s.tx != nil {
37+
if m != DataQueryMode {
38+
return nil, xerrors.WithStackTrace(
39+
fmt.Errorf("query mode `%s` not supported with prepared statement", m.String()),
40+
)
41+
}
42+
return s.tx.QueryContext(withKeepInCache(ctx), s.query, args)
43+
}
44+
switch m {
3545
case DataQueryMode:
3646
_, res, err := s.conn.session.Execute(ctx,
3747
txControl(ctx, s.conn.defaultTxControl),
3848
s.query,
3949
toQueryParams(args),
40-
append(
41-
append(
42-
[]options.ExecuteDataQueryOption{},
43-
dataQueryOptions(ctx)...,
44-
),
45-
options.WithKeepInCache(true),
46-
)...,
50+
dataQueryOptions(withKeepInCache(ctx))...,
4751
)
4852
if err != nil {
4953
return nil, s.conn.checkClosed(err)
@@ -63,19 +67,22 @@ func (s *stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (drive
6367
if s.conn.isClosed() {
6468
return nil, errClosedConn
6569
}
66-
switch m := queryModeFromContext(ctx, s.conn.defaultQueryMode); m {
70+
m := queryModeFromContext(ctx, s.conn.defaultQueryMode)
71+
if s.tx != nil {
72+
if m != DataQueryMode {
73+
return nil, xerrors.WithStackTrace(
74+
fmt.Errorf("query mode `%s` not supported with prepared statement", m.String()),
75+
)
76+
}
77+
return s.tx.ExecContext(withKeepInCache(ctx), s.query, args)
78+
}
79+
switch m {
6780
case DataQueryMode:
6881
_, res, err := s.conn.session.Execute(ctx,
6982
txControl(ctx, s.conn.defaultTxControl),
7083
s.query,
7184
toQueryParams(args),
72-
append(
73-
append(
74-
[]options.ExecuteDataQueryOption{},
75-
dataQueryOptions(ctx)...,
76-
),
77-
options.WithKeepInCache(true),
78-
)...,
85+
dataQueryOptions(withKeepInCache(ctx))...,
7986
)
8087
if err != nil {
8188
return nil, s.conn.checkClosed(err)

internal/xsql/tx.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func (tx *tx) QueryContext(ctx context.Context, query string, args []driver.Name
2929
if tx.conn.isClosed() {
3030
return nil, errClosedConn
3131
}
32-
res, err := tx.transaction.Execute(ctx, query, toQueryParams(args))
32+
res, err := tx.transaction.Execute(ctx, query, toQueryParams(args), dataQueryOptions(ctx)...)
3333
if err != nil {
3434
return nil, tx.conn.checkClosed(err)
3535
}
@@ -45,7 +45,7 @@ func (tx *tx) ExecContext(ctx context.Context, query string, args []driver.Named
4545
if tx.conn.isClosed() {
4646
return nil, errClosedConn
4747
}
48-
res, err := tx.transaction.Execute(ctx, query, toQueryParams(args))
48+
res, err := tx.transaction.Execute(ctx, query, toQueryParams(args), dataQueryOptions(ctx)...)
4949
if err != nil {
5050
return nil, tx.conn.checkClosed(err)
5151
}

internal/xsql/xsql.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"database/sql/driver"
55

66
"github.com/ydb-platform/ydb-go-sdk/v3/table"
7-
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
87
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
98
)
109

@@ -21,13 +20,3 @@ func toQueryParams(values []driver.NamedValue) *table.QueryParameters {
2120
}
2221
return table.NewQueryParameters(opts...)
2322
}
24-
25-
func toSchemeOptions(values []driver.NamedValue) (opts []options.ExecuteSchemeQueryOption) {
26-
if len(values) == 0 {
27-
return nil
28-
}
29-
for _, arg := range values {
30-
opts = append(opts, arg.Value.(options.ExecuteSchemeQueryOption))
31-
}
32-
return opts
33-
}

retry/sql.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"database/sql"
66
"database/sql/driver"
7+
"fmt"
78

89
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
910
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql/isolation"
@@ -51,19 +52,23 @@ func WithTxSettings(txControl *table.TransactionSettings) DoTxOption {
5152

5253
// DoTx is a shortcut for calling Do(ctx, f) on initialized TxDoer with DB field set to given db.
5354
func DoTx(ctx context.Context, db *sql.DB, f TxOperationFunc, opts ...DoTxOption) error {
54-
options := doTxOptions{
55-
txOptions: &sql.TxOptions{
56-
Isolation: sql.LevelDefault,
57-
ReadOnly: false,
58-
},
59-
idempotent: false,
60-
}
55+
var (
56+
options = doTxOptions{
57+
txOptions: &sql.TxOptions{
58+
Isolation: sql.LevelDefault,
59+
ReadOnly: false,
60+
},
61+
idempotent: false,
62+
}
63+
attempts = 0
64+
)
6165
for _, o := range opts {
6266
if err := o(&options); err != nil {
6367
return xerrors.WithStackTrace(err)
6468
}
6569
}
6670
err := Retry(ctx, func(ctx context.Context) (err error) {
71+
attempts++
6772
tx, err := db.BeginTx(ctx, options.txOptions)
6873
if err != nil {
6974
return xerrors.WithStackTrace(err)
@@ -85,7 +90,9 @@ func DoTx(ctx context.Context, db *sql.DB, f TxOperationFunc, opts ...DoTxOption
8590
return nil
8691
}, WithIdempotent(options.idempotent))
8792
if err != nil {
88-
return xerrors.WithStackTrace(err)
93+
return xerrors.WithStackTrace(
94+
fmt.Errorf("opration failed with %d attempts: %w", attempts, err),
95+
)
8996
}
9097
return nil
9198
}

retry/sql_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package retry
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"database/sql/driver"
7+
"testing"
8+
"time"
9+
10+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
11+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"
12+
13+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
14+
)
15+
16+
type mock struct{}
17+
18+
func (c mock) Commit() error {
19+
return nil
20+
}
21+
22+
func (c mock) Rollback() error {
23+
return nil
24+
}
25+
26+
func (c mock) Open(name string) (driver.Conn, error) {
27+
return c, nil
28+
}
29+
30+
func (c mock) Prepare(query string) (driver.Stmt, error) {
31+
return nil, nil
32+
}
33+
34+
func (c mock) Close() error {
35+
return nil
36+
}
37+
38+
func (c mock) Begin() (driver.Tx, error) {
39+
return c, nil
40+
}
41+
42+
func (c mock) Connect(ctx context.Context) (driver.Conn, error) {
43+
return c, nil
44+
}
45+
46+
func (c mock) Driver() driver.Driver {
47+
return c
48+
}
49+
50+
func (c mock) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
51+
return c, nil
52+
}
53+
54+
func TestDoTxAbortedTLI(t *testing.T) {
55+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
56+
defer cancel()
57+
db := sql.OpenDB(mock{})
58+
var counter int
59+
if err := DoTx(ctx, db, func(ctx context.Context, tx *sql.Tx) error {
60+
counter++
61+
if counter > 10 {
62+
return nil
63+
}
64+
return xerrors.Operation(
65+
xerrors.WithStatusCode(Ydb.StatusIds_ABORTED),
66+
xerrors.WithIssues([]*Ydb_Issue.IssueMessage{
67+
{IssueCode: 2001, Message: "Transaction locks invalidated"},
68+
}),
69+
)
70+
}); err != nil {
71+
t.Errorf("retry operation failed: %v", err)
72+
}
73+
if counter <= 1 {
74+
t.Errorf("nothing attempts: %d", counter)
75+
}
76+
}

0 commit comments

Comments
 (0)