Skip to content

Commit a9c49bc

Browse files
committed
fix initializing of lazy transactions + refactored traces
1 parent cc1d1de commit a9c49bc

File tree

10 files changed

+143
-163
lines changed

10 files changed

+143
-163
lines changed

internal/table/session.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -671,26 +671,31 @@ func (s *session) Execute(
671671
return nil, nil, xerrors.WithStackTrace(err)
672672
}
673673

674-
return s.executeQueryResult(result, txControl)
674+
return s.executeQueryResult(result, request.TxControl)
675675
}
676676

677677
// executeQueryResult returns Transaction and result built from received
678678
// result.
679-
func (s *session) executeQueryResult(res *Ydb_Table.ExecuteQueryResult, txControl *table.TransactionControl) (
679+
func (s *session) executeQueryResult(
680+
res *Ydb_Table.ExecuteQueryResult, txControl *Ydb_Table.TransactionControl,
681+
) (
680682
table.Transaction, result.Result, error,
681683
) {
682-
t := &transaction{
683-
id: res.GetTxMeta().GetId(),
684-
state: txStateInitialized,
685-
s: s,
686-
control: txControl,
684+
tx := &transaction{
685+
id: res.GetTxMeta().GetId(),
686+
s: s,
687687
}
688-
r := scanner.NewUnary(
688+
if txControl.CommitTx {
689+
tx.state = txStateCommitted
690+
} else {
691+
tx.state = txStateInitialized
692+
tx.control = table.TxControl(table.WithTxID(tx.id))
693+
}
694+
return tx, scanner.NewUnary(
689695
res.GetResultSets(),
690696
res.GetQueryStats(),
691697
scanner.WithIgnoreTruncated(s.config.IgnoreTruncated()),
692-
)
693-
return t, r, nil
698+
), nil
694699
}
695700

696701
// executeDataQuery executes data query.

internal/table/statement.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,21 +63,20 @@ func (s *statement) Execute(
6363
onDone(txr, true, r, err)
6464
}()
6565

66-
return s.execute(ctx, a, request, txControl)
66+
return s.execute(ctx, a, request, request.TxControl)
6767
}
6868

6969
// execute executes prepared query without any tracing.
7070
func (s *statement) execute(
7171
ctx context.Context, a *allocator.Allocator,
72-
request *Ydb_Table.ExecuteDataQueryRequest, txControl *table.TransactionControl,
72+
request *Ydb_Table.ExecuteDataQueryRequest, txControl *Ydb_Table.TransactionControl,
7373
) (
7474
txr table.Transaction, r result.Result, err error,
7575
) {
7676
res, err := s.session.executeDataQuery(ctx, a, request)
7777
if err != nil {
7878
return nil, nil, xerrors.WithStackTrace(err)
7979
}
80-
8180
return s.session.executeQueryResult(res, txControl)
8281
}
8382

internal/table/transaction.go

Lines changed: 98 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,14 @@ import (
1818
)
1919

2020
var (
21-
errTxInvalidatedWithCommit = xerrors.Wrap(fmt.Errorf("transaction invalidated from WithCommit() call"))
22-
errTxAlreadyCommitted = xerrors.Wrap(fmt.Errorf("transaction already committed"))
23-
errTxRollbackedEarly = xerrors.Wrap(fmt.Errorf("transaction rollbacked early"))
21+
errTxAlreadyCommitted = xerrors.Wrap(fmt.Errorf("transaction already committed"))
22+
errTxRollbackedEarly = xerrors.Wrap(fmt.Errorf("transaction rollbacked early"))
2423
)
2524

2625
type txState int32
2726

2827
const (
2928
txStateInitialized = iota
30-
txStateInvalidatedWithCommit
3129
txStateCommitted
3230
txStateRollbacked
3331
)
@@ -49,48 +47,30 @@ func (tx *transaction) Execute(
4947
query string, params *table.QueryParameters,
5048
opts ...options.ExecuteDataQueryOption,
5149
) (r result.Result, err error) {
50+
onDone := trace.TableOnSessionTransactionExecute(
51+
tx.s.config.Trace(), &ctx, tx.s, tx, queryFromText(query), params, false,
52+
)
53+
defer func() {
54+
onDone(r, err)
55+
}()
56+
5257
switch txState(atomic.LoadInt32((*int32)(&tx.state))) {
53-
case txStateInvalidatedWithCommit:
54-
return nil, xerrors.WithStackTrace(errTxInvalidatedWithCommit)
5558
case txStateCommitted:
5659
return nil, xerrors.WithStackTrace(errTxAlreadyCommitted)
5760
case txStateRollbacked:
5861
return nil, xerrors.WithStackTrace(errTxRollbackedEarly)
5962
default:
60-
}
61-
var (
62-
a = allocator.New()
63-
q = queryFromText(query)
64-
optsResult options.ExecuteDataQueryDesc
65-
)
66-
defer a.Free()
63+
_, r, err = tx.s.Execute(ctx, tx.control, query, params, opts...)
64+
if err != nil {
65+
return nil, xerrors.WithStackTrace(err)
66+
}
6767

68-
if params == nil {
69-
params = table.NewQueryParameters()
70-
}
68+
if tx.control.Desc().CommitTx {
69+
atomic.StoreInt32((*int32)(&tx.state), txStateCommitted)
70+
}
7171

72-
for _, f := range opts {
73-
f(&optsResult, a)
72+
return r, nil
7473
}
75-
76-
onDone := trace.TableOnSessionTransactionExecute(
77-
tx.s.config.Trace(),
78-
&ctx,
79-
tx.s,
80-
tx,
81-
q,
82-
params,
83-
optsResult.QueryCachePolicy.GetKeepInCache(),
84-
)
85-
defer func() {
86-
onDone(r, err)
87-
}()
88-
_, r, err = tx.s.Execute(ctx, tx.control, query, params, opts...)
89-
if err != nil {
90-
return nil, xerrors.WithStackTrace(err)
91-
}
92-
93-
return r, nil
9474
}
9575

9676
// ExecuteStatement executes prepared statement stmt within transaction tx.
@@ -99,15 +79,6 @@ func (tx *transaction) ExecuteStatement(
9979
stmt table.Statement, params *table.QueryParameters,
10080
opts ...options.ExecuteDataQueryOption,
10181
) (r result.Result, err error) {
102-
switch txState(atomic.LoadInt32((*int32)(&tx.state))) {
103-
case txStateInvalidatedWithCommit:
104-
return nil, xerrors.WithStackTrace(errTxInvalidatedWithCommit)
105-
case txStateCommitted:
106-
return nil, xerrors.WithStackTrace(errTxAlreadyCommitted)
107-
case txStateRollbacked:
108-
return nil, xerrors.WithStackTrace(errTxRollbackedEarly)
109-
default:
110-
}
11182
if params == nil {
11283
params = table.NewQueryParameters()
11384
}
@@ -126,29 +97,29 @@ func (tx *transaction) ExecuteStatement(
12697
&ctx,
12798
tx.s,
12899
tx,
100+
stmt.(*statement).query,
129101
params,
130102
)
131103
defer func() {
132104
onDone(r, err)
133105
}()
134106

135-
_, r, err = stmt.Execute(ctx, tx.control, params, opts...)
136-
if err != nil {
137-
return nil, xerrors.WithStackTrace(err)
138-
}
107+
switch txState(atomic.LoadInt32((*int32)(&tx.state))) {
108+
case txStateCommitted:
109+
return nil, xerrors.WithStackTrace(errTxAlreadyCommitted)
110+
case txStateRollbacked:
111+
return nil, xerrors.WithStackTrace(errTxRollbackedEarly)
112+
default:
113+
_, r, err = stmt.Execute(ctx, tx.control, params, opts...)
114+
if err != nil {
115+
return nil, xerrors.WithStackTrace(err)
116+
}
139117

140-
return r, nil
141-
}
118+
if tx.control.Desc().CommitTx {
119+
atomic.StoreInt32((*int32)(&tx.state), txStateCommitted)
120+
}
142121

143-
func (tx *transaction) WithCommit() table.TransactionActor {
144-
defer func() {
145-
atomic.StoreInt32((*int32)(&tx.state), txStateInvalidatedWithCommit)
146-
}()
147-
return &transaction{
148-
id: tx.id,
149-
s: tx.s,
150-
control: table.TxControl(table.WithTxID(tx.id), table.CommitTx()),
151-
state: txState(atomic.LoadInt32((*int32)(&tx.state))),
122+
return r, nil
152123
}
153124
}
154125

@@ -157,20 +128,6 @@ func (tx *transaction) CommitTx(
157128
ctx context.Context,
158129
opts ...options.CommitTransactionOption,
159130
) (r result.Result, err error) {
160-
switch txState(atomic.LoadInt32((*int32)(&tx.state))) {
161-
case txStateInvalidatedWithCommit:
162-
return nil, xerrors.WithStackTrace(errTxInvalidatedWithCommit)
163-
case txStateCommitted:
164-
return nil, xerrors.WithStackTrace(errTxAlreadyCommitted)
165-
case txStateRollbacked:
166-
return nil, xerrors.WithStackTrace(errTxRollbackedEarly)
167-
default:
168-
defer func() {
169-
if err == nil {
170-
atomic.StoreInt32((*int32)(&tx.state), txStateCommitted)
171-
}
172-
}()
173-
}
174131
onDone := trace.TableOnSessionTransactionCommit(
175132
tx.s.config.Trace(),
176133
&ctx,
@@ -180,56 +137,54 @@ func (tx *transaction) CommitTx(
180137
defer func() {
181138
onDone(err)
182139
}()
183-
var (
184-
request = &Ydb_Table.CommitTransactionRequest{
185-
SessionId: tx.s.id,
186-
TxId: tx.id,
187-
OperationParams: operation.Params(
188-
ctx,
189-
tx.s.config.OperationTimeout(),
190-
tx.s.config.OperationCancelAfter(),
191-
operation.ModeSync,
192-
),
140+
141+
switch txState(atomic.LoadInt32((*int32)(&tx.state))) {
142+
case txStateCommitted:
143+
return nil, xerrors.WithStackTrace(errTxAlreadyCommitted)
144+
case txStateRollbacked:
145+
return nil, xerrors.WithStackTrace(errTxRollbackedEarly)
146+
default:
147+
var (
148+
request = &Ydb_Table.CommitTransactionRequest{
149+
SessionId: tx.s.id,
150+
TxId: tx.id,
151+
OperationParams: operation.Params(
152+
ctx,
153+
tx.s.config.OperationTimeout(),
154+
tx.s.config.OperationCancelAfter(),
155+
operation.ModeSync,
156+
),
157+
}
158+
response *Ydb_Table.CommitTransactionResponse
159+
result = new(Ydb_Table.CommitTransactionResult)
160+
)
161+
162+
for _, opt := range opts {
163+
opt((*options.CommitTransactionDesc)(request))
193164
}
194-
response *Ydb_Table.CommitTransactionResponse
195-
result = new(Ydb_Table.CommitTransactionResult)
196-
)
197165

198-
for _, opt := range opts {
199-
opt((*options.CommitTransactionDesc)(request))
200-
}
166+
response, err = tx.s.tableService.CommitTransaction(ctx, request)
167+
if err != nil {
168+
return nil, xerrors.WithStackTrace(err)
169+
}
201170

202-
response, err = tx.s.tableService.CommitTransaction(ctx, request)
203-
if err != nil {
204-
return nil, xerrors.WithStackTrace(err)
205-
}
206-
err = response.GetOperation().GetResult().UnmarshalTo(result)
207-
if err != nil {
208-
return nil, xerrors.WithStackTrace(err)
171+
err = response.GetOperation().GetResult().UnmarshalTo(result)
172+
if err != nil {
173+
return nil, xerrors.WithStackTrace(err)
174+
}
175+
176+
atomic.StoreInt32((*int32)(&tx.state), txStateCommitted)
177+
178+
return scanner.NewUnary(
179+
nil,
180+
result.GetQueryStats(),
181+
scanner.WithIgnoreTruncated(tx.s.config.IgnoreTruncated()),
182+
), nil
209183
}
210-
return scanner.NewUnary(
211-
nil,
212-
result.GetQueryStats(),
213-
scanner.WithIgnoreTruncated(tx.s.config.IgnoreTruncated()),
214-
), nil
215184
}
216185

217186
// Rollback performs a rollback of the specified active transaction.
218187
func (tx *transaction) Rollback(ctx context.Context) (err error) {
219-
switch txState(atomic.LoadInt32((*int32)(&tx.state))) {
220-
case txStateInvalidatedWithCommit:
221-
return xerrors.WithStackTrace(errTxInvalidatedWithCommit)
222-
case txStateCommitted:
223-
return xerrors.WithStackTrace(errTxAlreadyCommitted)
224-
case txStateRollbacked:
225-
return xerrors.WithStackTrace(errTxRollbackedEarly)
226-
default:
227-
defer func() {
228-
if err == nil {
229-
atomic.StoreInt32((*int32)(&tx.state), txStateRollbacked)
230-
}
231-
}()
232-
}
233188
onDone := trace.TableOnSessionTransactionRollback(
234189
tx.s.config.Trace(),
235190
&ctx,
@@ -240,17 +195,30 @@ func (tx *transaction) Rollback(ctx context.Context) (err error) {
240195
onDone(err)
241196
}()
242197

243-
_, err = tx.s.tableService.RollbackTransaction(ctx,
244-
&Ydb_Table.RollbackTransactionRequest{
245-
SessionId: tx.s.id,
246-
TxId: tx.id,
247-
OperationParams: operation.Params(
248-
ctx,
249-
tx.s.config.OperationTimeout(),
250-
tx.s.config.OperationCancelAfter(),
251-
operation.ModeSync,
252-
),
253-
},
254-
)
255-
return xerrors.WithStackTrace(err)
198+
switch txState(atomic.LoadInt32((*int32)(&tx.state))) {
199+
case txStateCommitted:
200+
return nil // nop for committed tx
201+
case txStateRollbacked:
202+
return xerrors.WithStackTrace(errTxRollbackedEarly)
203+
default:
204+
_, err = tx.s.tableService.RollbackTransaction(ctx,
205+
&Ydb_Table.RollbackTransactionRequest{
206+
SessionId: tx.s.id,
207+
TxId: tx.id,
208+
OperationParams: operation.Params(
209+
ctx,
210+
tx.s.config.OperationTimeout(),
211+
tx.s.config.OperationCancelAfter(),
212+
operation.ModeSync,
213+
),
214+
},
215+
)
216+
if err != nil {
217+
return xerrors.WithStackTrace(err)
218+
}
219+
220+
atomic.StoreInt32((*int32)(&tx.state), txStateRollbacked)
221+
222+
return nil
223+
}
256224
}

internal/table/transaction_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table"
1010
"google.golang.org/protobuf/proto"
1111

12+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/table"
1314
"github.com/ydb-platform/ydb-go-sdk/v3/testutil"
1415
)
@@ -113,11 +114,11 @@ func TestTxSkipRollbackForCommitted(t *testing.T) {
113114
t.Fatalf("unexpected rollback: %d", begin)
114115
}
115116
_, err = x.CommitTx(context.Background())
116-
if err != nil {
117-
t.Fatal(err)
117+
if !xerrors.Is(err, errTxRollbackedEarly) {
118+
t.Fatal("must be errTxRollbackedEarly")
118119
}
119-
if commit != 2 {
120-
t.Fatalf("unexpected commit: %d", begin)
120+
if commit != 1 {
121+
t.Fatalf("unexpected commit: %d", commit)
121122
}
122123
}
123124
}

0 commit comments

Comments
 (0)