Skip to content

Commit 976b95e

Browse files
committed
added TestNoEffectsIfForgetCommitTx
1 parent 9e5787d commit 976b95e

File tree

4 files changed

+181
-12
lines changed

4 files changed

+181
-12
lines changed

internal/xsql/conn.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -413,9 +413,9 @@ func (c *conn) BeginTx(ctx context.Context, txOptions driver.TxOptions) (tx driv
413413
}()
414414

415415
if c.currentTx != nil {
416-
return nil, xerrors.WithStackTrace(
417-
fmt.Errorf("conn already have an opened currentTx: %s", c.currentTx.ID()),
418-
)
416+
return nil, xerrors.WithStackTrace(&ErrConnAlreadyHaveTx{
417+
currentTx: c.currentTx.ID(),
418+
})
419419
}
420420

421421
m := queryModeFromContext(ctx, c.defaultQueryMode)

internal/xsql/errors.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,21 @@ var (
1313
errConnClosedEarly = xerrors.Retryable(errors.New("conn closed early"), xerrors.WithDeleteSession())
1414
errNotReadyConn = xerrors.Retryable(errors.New("conn not ready"), xerrors.WithDeleteSession())
1515
)
16+
17+
type ErrConnAlreadyHaveTx struct {
18+
currentTx string
19+
}
20+
21+
func (err *ErrConnAlreadyHaveTx) Error() string {
22+
return "conn already have an opened currentTx: " + err.currentTx
23+
}
24+
25+
func (err *ErrConnAlreadyHaveTx) As(target interface{}) bool {
26+
switch t := target.(type) {
27+
case *ErrConnAlreadyHaveTx:
28+
t.currentTx = err.currentTx
29+
return true
30+
default:
31+
return false
32+
}
33+
}

tests/integration/database_sql_regression_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,16 @@ import (
2525
)
2626

2727
func TestRegressionCloud109307(t *testing.T) {
28-
scope := newScope(t)
29-
db := scope.SQLDriverWithFolder(
30-
ydb.WithTablePathPrefix(scope.Folder()),
31-
ydb.WithAutoDeclare(),
28+
var (
29+
ctx = xtest.Context(t)
30+
scope = newScope(t)
31+
db = scope.SQLDriverWithFolder(
32+
ydb.WithTablePathPrefix(scope.Folder()),
33+
ydb.WithAutoDeclare(),
34+
)
3235
)
3336

34-
ctx, cancel := context.WithTimeout(xtest.Context(t), 42*time.Second)
37+
ctx, cancel := context.WithTimeout(ctx, 42*time.Second)
3538
defer cancel()
3639

3740
for i := int64(1); ; i++ {
@@ -75,15 +78,12 @@ func TestRegressionCloud109307(t *testing.T) {
7578

7679
func TestRegressionKikimr17104(t *testing.T) {
7780
var (
81+
ctx = xtest.Context(t)
7882
scope = newScope(t)
7983
db = scope.SQLDriverWithFolder(
8084
ydb.WithTablePathPrefix(scope.Folder()),
8185
ydb.WithAutoDeclare(),
8286
)
83-
)
84-
85-
var (
86-
ctx = xtest.Context(t)
8787
tableName = t.Name()
8888
upsertRowsCount = 100000
8989
upsertChecksum uint64

tests/integration/tx_test.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
//go:build integration
2+
// +build integration
3+
4+
package integration
5+
6+
import (
7+
"database/sql"
8+
"testing"
9+
10+
"github.com/stretchr/testify/require"
11+
12+
"github.com/ydb-platform/ydb-go-sdk/v3"
13+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql"
14+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
15+
"github.com/ydb-platform/ydb-go-sdk/v3/table"
16+
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/indexed"
17+
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
18+
)
19+
20+
func TestNoEffectsIfForgetCommitTx(t *testing.T) {
21+
var (
22+
ctx = xtest.Context(t)
23+
scope = newScope(t)
24+
nativeDriver = scope.Driver()
25+
db = scope.SQLDriver(ydb.WithTablePathPrefix(scope.Folder()), ydb.WithAutoDeclare(), ydb.WithNumericArgs())
26+
tablePath = scope.TablePath() // for auto-create table
27+
)
28+
29+
t.Run("native", func(t *testing.T) {
30+
id := uint64(123)
31+
32+
// create session
33+
s, err := nativeDriver.Table().CreateSession(ctx) //nolint:
34+
require.NoError(t, err)
35+
36+
// tx1 (without commit)
37+
tx1, err := s.BeginTransaction(ctx, table.TxSettings(table.WithSerializableReadWrite()))
38+
require.NoError(t, err)
39+
40+
// upsert data inside tx1
41+
_, err = tx1.Execute(ctx, `
42+
DECLARE $p1 AS Uint64;
43+
DECLARE $p2 AS Text;
44+
UPSERT INTO `+"`"+tablePath+"`"+` (
45+
id, val
46+
) VALUES (
47+
$p1, $p2
48+
);`,
49+
table.NewQueryParameters(
50+
table.ValueParam("$p1", types.Uint64Value(id)),
51+
table.ValueParam("$p2", types.TextValue("1st tx")),
52+
),
53+
)
54+
require.NoError(t, err)
55+
56+
// check for NO persist data from tx1
57+
_, result, err := s.Execute(ctx, table.DefaultTxControl(), `
58+
DECLARE $p1 AS Uint64;
59+
SELECT val FROM `+"`"+tablePath+"`"+`
60+
WHERE id = $p1;`,
61+
table.NewQueryParameters(
62+
table.ValueParam("$p1", types.Uint64Value(id)),
63+
),
64+
)
65+
require.NoError(t, err)
66+
require.NoError(t, result.NextResultSetErr(ctx))
67+
require.False(t, result.NextRow())
68+
69+
// tx2 (with commit)
70+
tx2, err := s.BeginTransaction(ctx, table.TxSettings(table.WithSerializableReadWrite()))
71+
require.NoError(t, err)
72+
73+
// check for NO data from tx1
74+
result, err = tx2.Execute(ctx, `
75+
DECLARE $p1 AS Uint64;
76+
SELECT val FROM `+"`"+tablePath+"`"+`
77+
WHERE id = $p1;`,
78+
table.NewQueryParameters(
79+
table.ValueParam("$p1", types.Uint64Value(id)),
80+
),
81+
)
82+
require.NoError(t, err)
83+
require.NoError(t, result.NextResultSetErr(ctx))
84+
require.False(t, result.NextRow())
85+
86+
// upsert data inside tx2
87+
_, err = tx2.Execute(ctx, `
88+
DECLARE $p1 AS Uint64;
89+
DECLARE $p2 AS Text;
90+
UPSERT INTO `+"`"+tablePath+"`"+` (
91+
id, val
92+
) VALUES (
93+
$p1, $p2
94+
);`,
95+
table.NewQueryParameters(
96+
table.ValueParam("$p1", types.Uint64Value(id)),
97+
table.ValueParam("$p2", types.TextValue("2nd tx")),
98+
),
99+
)
100+
require.NoError(t, err)
101+
// commit tx2
102+
_, err = tx2.CommitTx(ctx)
103+
require.NoError(t, err)
104+
105+
// check for persist data from tx2
106+
_, result, err = s.Execute(ctx, table.DefaultTxControl(), `
107+
DECLARE $p1 AS Uint64;
108+
SELECT val FROM `+"`"+tablePath+"`"+`
109+
WHERE id = $p1;`,
110+
table.NewQueryParameters(
111+
table.ValueParam("$p1", types.Uint64Value(id)),
112+
),
113+
)
114+
require.NoError(t, err)
115+
require.NoError(t, result.NextResultSetErr(ctx))
116+
require.True(t, result.NextRow())
117+
118+
var value *string
119+
require.NoError(t, result.Scan(indexed.Optional(&value)))
120+
require.NoError(t, result.Err())
121+
122+
require.NotNil(t, value)
123+
require.Equal(t, "2nd tx", *value)
124+
})
125+
126+
t.Run("database/sql", func(t *testing.T) {
127+
id := uint64(456)
128+
129+
// create connection === YDB table session
130+
cc, err := db.Conn(ctx)
131+
require.NoError(t, err)
132+
133+
// first tx with no commit
134+
tx1, err := cc.BeginTx(ctx, &sql.TxOptions{})
135+
require.NoError(t, err)
136+
_, err = tx1.ExecContext(ctx, `UPSERT INTO table (id, val) VALUES ($1, $2)`, id, "1st tx")
137+
require.NoError(t, err)
138+
139+
// check row for NO write
140+
var (
141+
value string
142+
errConnAlreadyHaveTx *xsql.ErrConnAlreadyHaveTx
143+
)
144+
err = db.QueryRowContext(ctx, `SELECT val FROM table WHERE id = $1`, id).Scan(&value)
145+
require.ErrorIs(t, err, sql.ErrNoRows)
146+
147+
// second tx on existing conn === session
148+
_, err = cc.BeginTx(ctx, &sql.TxOptions{})
149+
require.ErrorAs(t, err, &errConnAlreadyHaveTx)
150+
})
151+
}

0 commit comments

Comments
 (0)