Skip to content

Commit b51a47c

Browse files
authored
Merge pull request #335 from ydb-platform/retry-do-tx
* Added `retry.DoTx` helper for retrying `database/sql` transactions
2 parents 6f3cb34 + cf0e82e commit b51a47c

File tree

12 files changed

+542
-165
lines changed

12 files changed

+542
-165
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Added `retry.DoTx` helper for retrying `database/sql` transactions
12
* Implemented `database/sql` driver over `ydb-go-sdk`
23
* Marked as deprecated `trace.Table.OnPoolSessionNew` and `trace.Table.OnPoolSessionClose` events
34
* Added `trace.Table.OnPoolSessionAdd` and `trace.Table.OnPoolSessionRemove` events

internal/scheme/client.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,14 +153,20 @@ func (c *Client) DescribePath(ctx context.Context, path string) (e scheme.Entry,
153153
}
154154
call := func(ctx context.Context) error {
155155
e, err = c.describePath(ctx, path)
156-
return xerrors.WithStackTrace(err)
156+
if err != nil {
157+
return xerrors.WithStackTrace(err)
158+
}
159+
return nil
157160
}
158161
if !c.config.AutoRetry() {
159162
err = call(ctx)
160163
return
161164
}
162165
err = retry.Retry(ctx, call, retry.WithIdempotent(true), retry.WithStackTrace())
163-
return
166+
if err != nil {
167+
return e, xerrors.WithStackTrace(err)
168+
}
169+
return e, nil
164170
}
165171

166172
func (c *Client) describePath(ctx context.Context, path string) (e scheme.Entry, err error) {
@@ -223,7 +229,10 @@ func (c *Client) modifyPermissions(ctx context.Context, path string, opts ...sch
223229
),
224230
},
225231
)
226-
return xerrors.WithStackTrace(err)
232+
if err != nil {
233+
return xerrors.WithStackTrace(err)
234+
}
235+
return nil
227236
}
228237

229238
func putEntry(dst []scheme.Entry, src []*Ydb_Scheme.Entry) {

internal/table/statement.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ type statement struct {
1818
params map[string]*Ydb.Type
1919
}
2020

21+
func Params(s table.Statement) map[string]*Ydb.Type {
22+
return s.(*statement).params
23+
}
24+
2125
// Execute executes prepared data query.
2226
func (s *statement) Execute(
2327
ctx context.Context, tx *table.TransactionControl,

internal/xsql/conn.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import (
77
"fmt"
88
"sync/atomic"
99

10+
internal "github.com/ydb-platform/ydb-go-sdk/v3/internal/table"
1011
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql/isolation"
1113
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
1214
"github.com/ydb-platform/ydb-go-sdk/v3/table"
1315
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
@@ -114,17 +116,17 @@ func (c *conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, e
114116
return nil, c.checkClosed(err)
115117
}
116118
return &stmt{
117-
conn: c,
118-
statement: s,
119-
query: query,
119+
conn: c,
120+
params: internal.Params(s),
121+
query: query,
120122
}, nil
121123
}
122124

123-
func (c *conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
125+
func (c *conn) BeginTx(ctx context.Context, txOptions driver.TxOptions) (driver.Tx, error) {
124126
if c.isClosed() {
125127
return nil, errClosedConn
126128
}
127-
txSettings, err := mapIsolation(opts)
129+
txSettings, err := isolation.ToYDB(txOptions)
128130
if err != nil {
129131
return nil, xerrors.WithStackTrace(err)
130132
}

internal/xsql/dsn.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package xsql
2+
3+
import (
4+
"fmt"
5+
"net/url"
6+
7+
"github.com/ydb-platform/ydb-go-sdk/v3/config"
8+
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
9+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
10+
)
11+
12+
func Parse(dataSourceName string) (opts []config.Option, connectorOpts []ConnectorOption, err error) {
13+
uri, err := url.Parse(dataSourceName)
14+
if err != nil {
15+
return nil, nil, xerrors.WithStackTrace(err)
16+
}
17+
if token := uri.Query().Get("token"); token != "" {
18+
opts = append(opts, config.WithCredentials(credentials.NewAccessTokenCredentials(token)))
19+
}
20+
if queryMode := uri.Query().Get("query_mode"); queryMode != "" {
21+
mode := QueryModeFromString(queryMode)
22+
if mode == UnknownQueryMode {
23+
return nil, nil, xerrors.WithStackTrace(fmt.Errorf("unknown query mode: %s", queryMode))
24+
}
25+
connectorOpts = append(connectorOpts, WithDefaultQueryMode(mode))
26+
}
27+
return opts, connectorOpts, nil
28+
}

internal/xsql/isolation.go

Lines changed: 0 additions & 48 deletions
This file was deleted.
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package isolation
2+
3+
import (
4+
"database/sql"
5+
"database/sql/driver"
6+
"errors"
7+
"fmt"
8+
9+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table"
10+
11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/table"
13+
)
14+
15+
var errUnsupportedReadOnlyMode = errors.New("unsupported read-only mode")
16+
17+
// ToYDB maps driver transaction options to ydb transaction Option or query transaction control.
18+
// This caused by ydb logic that prevents start actual transaction with OnlineReadOnly mode and ReadCommitted
19+
// and ReadUncommitted isolation levels should use tx_control in every query request.
20+
// It returns error on unsupported options.
21+
func ToYDB(opts driver.TxOptions) (txSettings *table.TransactionSettings, err error) {
22+
level := sql.IsolationLevel(opts.Isolation)
23+
if !opts.ReadOnly {
24+
switch level {
25+
case sql.LevelDefault,
26+
sql.LevelSerializable:
27+
return table.TxSettings(table.WithSerializableReadWrite()), nil
28+
default:
29+
return nil, xerrors.WithStackTrace(fmt.Errorf(
30+
"ydb: unsupported transaction options: isolation=%s read_only=%t",
31+
nameIsolationLevel(level), opts.ReadOnly,
32+
))
33+
}
34+
}
35+
return nil, xerrors.WithStackTrace(errUnsupportedReadOnlyMode)
36+
}
37+
38+
func nameIsolationLevel(x sql.IsolationLevel) string {
39+
if int(x) < len(isolationLevelName) {
40+
return isolationLevelName[x]
41+
}
42+
return "unknown_isolation"
43+
}
44+
45+
var isolationLevelName = [...]string{
46+
sql.LevelDefault: "default",
47+
sql.LevelReadUncommitted: "read_uncommitted",
48+
sql.LevelReadCommitted: "read_committed",
49+
sql.LevelWriteCommitted: "write_committed",
50+
sql.LevelRepeatableRead: "repeatable_read",
51+
sql.LevelSnapshot: "snapshot",
52+
sql.LevelSerializable: "serializable",
53+
sql.LevelLinearizable: "linearizable",
54+
}
55+
56+
// FromYDB maps table transaction settings to driver transaction options
57+
func FromYDB(txSettings *table.TransactionSettings) (txOptions *sql.TxOptions, err error) {
58+
switch txMode := txSettings.Settings().TxMode.(type) {
59+
case *Ydb_Table.TransactionSettings_SerializableReadWrite:
60+
return &sql.TxOptions{
61+
Isolation: sql.LevelSerializable,
62+
ReadOnly: false,
63+
}, nil
64+
case *Ydb_Table.TransactionSettings_OnlineReadOnly:
65+
txOptions = &sql.TxOptions{
66+
Isolation: sql.LevelReadCommitted,
67+
ReadOnly: true,
68+
}
69+
if txMode.OnlineReadOnly.AllowInconsistentReads {
70+
txOptions.Isolation = sql.LevelReadUncommitted
71+
}
72+
return txOptions, nil
73+
case *Ydb_Table.TransactionSettings_StaleReadOnly:
74+
return &sql.TxOptions{
75+
Isolation: sql.LevelReadCommitted,
76+
ReadOnly: true,
77+
}, nil
78+
default:
79+
return nil, xerrors.WithStackTrace(fmt.Errorf(
80+
"ydb: unsupported transaction options: txMode=%T(%v)",
81+
txMode, txMode,
82+
))
83+
}
84+
}

0 commit comments

Comments
 (0)