Skip to content

Commit 244f96f

Browse files
committed
* Supported read-only sql.LevelSnapshot isolation with fake transaction and OnlineReadOnly transaction control (transient, while YDB clusters are not updated with true snapshot isolation mode)
1 parent 98b3648 commit 244f96f

File tree

9 files changed

+162
-100
lines changed

9 files changed

+162
-100
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Supported read-only `sql.LevelSnapshot` isolation with fake transaction and `OnlineReadOnly` transaction control (transient, while YDB clusters are not updated with true snapshot isolation mode)
12
* Supported the `*sql.Conn` as input type `ydb.Unwrap` helper for go's 1.18
23

34
## v3.36.2

SQL.md

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,9 @@ if err = rows.Err(); err != nil { // always check final rows err
8787

8888
### With transaction <a name="queries-tx"></a>
8989
Supports only `default` transaction options which mapped to `YDB` `SerializableReadWrite` transaction settings.
90+
9091
`YDB`'s `OnlineReadOnly` and `StaleReadOnly` transaction settings are not compatible with interactive transactions such as `database/sql`'s `*sql.Tx`.
91-
`YDB`'s `OnlineReadOnly` and `StaleReadOnly` transaction settings can be explicitly applied to each query outside interactive transaction (see more in [Isolation levels support](#tx-control))
92+
That's why `ydb-go-sdk` implements read-only `sql.LevelSnapshot` with fake transaction (transient, while YDB clusters are not updated with true snapshot isolation mode)
9293
```go
9394
tx, err := db.BeginTx(ctx, sql.TxOptions{})
9495
if err != nil {
@@ -191,14 +192,19 @@ import (
191192
)
192193
...
193194
err := retry.DoTx(context.TODO(), db, func(ctx context.Context, tx *sql.Tx) error {
194-
// work with tx
195-
rows, err := tx.QueryContext(ctx, "SELECT 1;")
196-
if err != nil {
197-
return err // return err to retryer
198-
}
199-
...
200-
return nil // good final of retry tx operation
201-
}, retry.WithDoTxRetryOptions(retry.WithIdempotent(true)))
195+
// work with tx
196+
rows, err := tx.QueryContext(ctx, "SELECT 1;")
197+
if err != nil {
198+
return err // return err to retryer
199+
}
200+
...
201+
return nil // good final of retry tx operation
202+
}, retry.WithDoTxRetryOptions(
203+
retry.WithIdempotent(true),
204+
), retry.WithTxOptions(&sql.TxOptions{
205+
Isolation: sql.LevelSnapshot,
206+
ReadOnly: true,
207+
}))
202208
```
203209

204210
## Query args types <a name="arg-types"></a>

internal/xsql/conn.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,14 @@ type conn struct {
6262

6363
scanOpts []options.ExecuteScanQueryOption
6464

65-
currentTx *tx
65+
currentTx currentTx
66+
}
67+
68+
type currentTx interface {
69+
driver.Tx
70+
driver.ExecerContext
71+
driver.QueryerContext
72+
table.TransactionIdentifier
6673
}
6774

6875
var (
@@ -309,6 +316,19 @@ func (c *conn) BeginTx(ctx context.Context, txOptions driver.TxOptions) (_ drive
309316
fmt.Errorf("conn already have an opened currentTx: %s", c.currentTx.ID()),
310317
)
311318
}
319+
// TODO: replace with true transaction with snapshot read-only isolation after implementing it on server-side
320+
//nolint:godox
321+
if txOptions.ReadOnly && txOptions.Isolation == driver.IsolationLevel(sql.LevelSnapshot) {
322+
c.currentTx = &fakeTx{
323+
conn: c,
324+
txControl: table.TxControl(
325+
table.BeginTx(table.WithSerializableReadWrite()),
326+
table.CommitTx(),
327+
),
328+
ctx: ctx,
329+
}
330+
return c.currentTx, nil
331+
}
312332
var txc table.TxOption
313333
txc, err = isolation.ToYDB(txOptions)
314334
if err != nil {

internal/xsql/fake_tx.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package xsql
2+
3+
// TODO: drop this after implementing it on server-side
4+
//nolint:godox
5+
6+
import (
7+
"context"
8+
"database/sql/driver"
9+
10+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/retry"
11+
"github.com/ydb-platform/ydb-go-sdk/v3/table"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
13+
)
14+
15+
type fakeTx struct {
16+
txControl *table.TransactionControl
17+
conn *conn
18+
ctx context.Context
19+
}
20+
21+
func (tx *fakeTx) ID() string {
22+
return "fakeTx"
23+
}
24+
25+
func (tx *fakeTx) Commit() (err error) {
26+
onDone := trace.DatabaseSQLOnTxCommit(tx.conn.trace, &tx.ctx, tx)
27+
defer func() {
28+
onDone(err)
29+
}()
30+
if tx.conn.isClosed() {
31+
return errClosedConn
32+
}
33+
defer func() {
34+
tx.conn.currentTx = nil
35+
}()
36+
return nil
37+
}
38+
39+
func (tx *fakeTx) Rollback() (err error) {
40+
onDone := trace.DatabaseSQLOnTxRollback(tx.conn.trace, &tx.ctx, tx)
41+
defer func() {
42+
onDone(err)
43+
}()
44+
if tx.conn.isClosed() {
45+
return errClosedConn
46+
}
47+
defer func() {
48+
tx.conn.currentTx = nil
49+
}()
50+
return nil
51+
}
52+
53+
func (tx *fakeTx) QueryContext(ctx context.Context, q string, args []driver.NamedValue) (_ driver.Rows, err error) {
54+
onDone := trace.DatabaseSQLOnTxQuery(tx.conn.trace, &ctx, tx.ctx, tx, q, retry.IsIdempotent(ctx))
55+
defer func() {
56+
onDone(err)
57+
}()
58+
return tx.conn.QueryContext(WithTxControl(ctx, tx.txControl), q, args)
59+
}
60+
61+
func (tx *fakeTx) ExecContext(ctx context.Context, q string, args []driver.NamedValue) (_ driver.Result, err error) {
62+
onDone := trace.DatabaseSQLOnTxExec(tx.conn.trace, &ctx, tx.ctx, tx, q, retry.IsIdempotent(ctx))
63+
defer func() {
64+
onDone(err)
65+
}()
66+
return tx.conn.ExecContext(WithTxControl(ctx, tx.txControl), q, args)
67+
}

internal/xsql/isolation/isolation.go

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"database/sql/driver"
66
"fmt"
77

8-
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table"
9-
108
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
119
"github.com/ydb-platform/ydb-go-sdk/v3/table"
1210
)
@@ -15,28 +13,12 @@ import (
1513
// This caused by ydb logic that prevents start actual transaction with OnlineReadOnly mode and ReadCommitted
1614
// and ReadUncommitted isolation levels should use tx_control in every query request.
1715
// It returns error on unsupported options.
18-
func ToYDB(opts driver.TxOptions) (txControl table.TxOption, err error) {
16+
func ToYDB(opts driver.TxOptions) (txcControl table.TxOption, err error) {
1917
level := sql.IsolationLevel(opts.Isolation)
2018
if !opts.ReadOnly && level == sql.LevelDefault {
2119
return table.WithSerializableReadWrite(), nil
2220
}
2321
return nil, xerrors.WithStackTrace(fmt.Errorf(
24-
"ydb: unsupported transaction options: isolation='%s' read_only='%t'",
25-
level.String(), opts.ReadOnly,
22+
"ydb: unsupported transaction options: %+v", opts,
2623
))
2724
}
28-
29-
// FromYDB maps table transaction settings to driver transaction options
30-
func FromYDB(txSettings *table.TransactionSettings) (txOptions *sql.TxOptions, err error) {
31-
switch txSettings.Settings().TxMode.(type) {
32-
case *Ydb_Table.TransactionSettings_SerializableReadWrite:
33-
return &sql.TxOptions{
34-
Isolation: sql.LevelDefault,
35-
ReadOnly: false,
36-
}, nil
37-
default:
38-
return nil, xerrors.WithStackTrace(
39-
fmt.Errorf("ydb: unsupported transaction settings: %+v", txSettings),
40-
)
41-
}
42-
}

internal/xsql/isolation/isolation_test.go

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -148,43 +148,3 @@ func TestToYDB(t *testing.T) {
148148
})
149149
}
150150
}
151-
152-
func TestFromYDB(t *testing.T) {
153-
for _, tt := range []struct {
154-
txControl table.TxOption
155-
txOptions *sql.TxOptions
156-
err bool
157-
}{
158-
{
159-
txControl: table.WithSerializableReadWrite(),
160-
txOptions: &sql.TxOptions{
161-
Isolation: sql.LevelDefault,
162-
ReadOnly: false,
163-
},
164-
},
165-
{
166-
txControl: table.WithOnlineReadOnly(),
167-
err: true,
168-
},
169-
{
170-
txControl: table.WithOnlineReadOnly(table.WithInconsistentReads()),
171-
err: true,
172-
},
173-
{
174-
txControl: table.WithStaleReadOnly(),
175-
err: true,
176-
},
177-
} {
178-
t.Run(fmt.Sprintf("%+v", table.TxSettings(tt.txControl).Settings()), func(t *testing.T) {
179-
fromYDB, err := FromYDB(table.TxSettings(tt.txControl))
180-
if tt.err {
181-
require.Error(t, err)
182-
} else {
183-
require.NoError(t, err)
184-
if *fromYDB != *tt.txOptions {
185-
t.Errorf("%+v != %+v", *fromYDB, *tt.txOptions)
186-
}
187-
}
188-
})
189-
}
190-
}

retry/sql.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ func Do(ctx context.Context, db *sql.DB, f func(ctx context.Context, cc *sql.Con
5757
}
5858

5959
type doTxOptions struct {
60+
txOptions *sql.TxOptions
6061
retryOptions []retryOption
6162
}
6263

@@ -71,14 +72,23 @@ func WithDoTxRetryOptions(opts ...retryOption) doTxOption {
7172
}
7273
}
7374

75+
// WithTxOptions specified transaction options
76+
func WithTxOptions(txOptions *sql.TxOptions) doTxOption {
77+
return func(o *doTxOptions) error {
78+
o.txOptions = txOptions
79+
return nil
80+
}
81+
}
82+
7483
// DoTx is a retryer of database/sql transactions with fallbacks on errors
7584
func DoTx(ctx context.Context, db *sql.DB, f func(context.Context, *sql.Tx) error, opts ...doTxOption) error {
7685
var (
77-
txOptions = &sql.TxOptions{
78-
Isolation: sql.LevelDefault,
79-
ReadOnly: false,
86+
options = doTxOptions{
87+
txOptions: &sql.TxOptions{
88+
Isolation: sql.LevelDefault,
89+
ReadOnly: false,
90+
},
8091
}
81-
options = doTxOptions{}
8292
attempts = 0
8393
)
8494
for _, o := range opts {
@@ -88,7 +98,7 @@ func DoTx(ctx context.Context, db *sql.DB, f func(context.Context, *sql.Tx) erro
8898
}
8999
err := Retry(ctx, func(ctx context.Context) (err error) {
90100
attempts++
91-
tx, err := db.BeginTx(ctx, txOptions)
101+
tx, err := db.BeginTx(ctx, options.txOptions)
92102
if err != nil {
93103
return unwrapErrBadConn(xerrors.WithStackTrace(err))
94104
}

sql_e2e_test.go

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -165,31 +165,38 @@ func TestDatabaseSql(t *testing.T) {
165165
if err != nil {
166166
t.Fatalf("begin tx failed: %v\n", err)
167167
}
168-
err = retry.DoTx(ctx, db, func(ctx context.Context, tx *sql.Tx) error {
169-
row := tx.QueryRowContext(ctx,
170-
render(
171-
querySelect,
172-
templateConfig{
173-
TablePathPrefix: path.Join(cc.Name(), folder),
174-
},
175-
),
176-
sql.Named("seriesID", uint64(1)),
177-
sql.Named("seasonID", uint64(1)),
178-
sql.Named("episodeID", uint64(1)),
179-
)
180-
var views sql.NullFloat64
181-
if err = row.Scan(&views); err != nil {
182-
return fmt.Errorf("cannot select current views: %w", err)
183-
}
184-
if !views.Valid {
185-
return fmt.Errorf("unexpected invalid views: %v", views)
186-
}
187-
t.Logf("views = %v", views)
188-
if views.Float64 != 1 {
189-
return fmt.Errorf("unexpected views value: %v", views)
190-
}
191-
return nil
192-
}, retry.WithDoTxRetryOptions(retry.WithIdempotent(true)))
168+
err = retry.DoTx(ctx, db,
169+
func(ctx context.Context, tx *sql.Tx) error {
170+
row := tx.QueryRowContext(ctx,
171+
render(
172+
querySelect,
173+
templateConfig{
174+
TablePathPrefix: path.Join(cc.Name(), folder),
175+
},
176+
),
177+
sql.Named("seriesID", uint64(1)),
178+
sql.Named("seasonID", uint64(1)),
179+
sql.Named("episodeID", uint64(1)),
180+
)
181+
var views sql.NullFloat64
182+
if err = row.Scan(&views); err != nil {
183+
return fmt.Errorf("cannot select current views: %w", err)
184+
}
185+
if !views.Valid {
186+
return fmt.Errorf("unexpected invalid views: %v", views)
187+
}
188+
t.Logf("views = %v", views)
189+
if views.Float64 != 1 {
190+
return fmt.Errorf("unexpected views value: %v", views)
191+
}
192+
return nil
193+
},
194+
retry.WithDoTxRetryOptions(retry.WithIdempotent(true)),
195+
retry.WithTxOptions(&sql.TxOptions{
196+
Isolation: sql.LevelSnapshot,
197+
ReadOnly: true,
198+
}),
199+
)
193200
if err != nil {
194201
t.Fatalf("begin tx failed: %v\n", err)
195202
}

table/table.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,15 @@ func DefaultTxControl() *TransactionControl {
364364
)
365365
}
366366

367+
// SerializableReadWriteTxControl returns transaction control with serializable read-write isolation mode
368+
func SerializableReadWriteTxControl(opts ...TxControlOption) *TransactionControl {
369+
return TxControl(
370+
append([]TxControlOption{
371+
BeginTx(WithSerializableReadWrite()),
372+
}, opts...)...,
373+
)
374+
}
375+
367376
// OnlineReadOnlyTxControl returns online read-only transaction control
368377
func OnlineReadOnlyTxControl(opts ...TxOnlineReadOnlyOption) *TransactionControl {
369378
return TxControl(

0 commit comments

Comments
 (0)