Skip to content

Commit a85f633

Browse files
authored
Merge pull request #1427 from ydb-platform/do-tx-settings
Do tx settings
2 parents 9217d77 + 108eb8a commit a85f633

File tree

4 files changed

+311
-55
lines changed

4 files changed

+311
-55
lines changed

internal/query/errors.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@ import (
77
)
88

99
var (
10-
ErrTransactionRollingBack = xerrors.Wrap(errors.New("ydb: the transaction is rolling back"))
11-
ErrNotImplemented = errors.New("not implemented yet")
12-
errWrongNextResultSetIndex = errors.New("wrong result set index")
13-
errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index")
14-
errMoreThanOneRow = errors.New("unexpected more than one row in result set")
15-
errMoreThanOneResultSet = errors.New("unexpected more than one result set")
16-
errNoResultSets = errors.New("no result sets")
10+
ErrTransactionRollingBack = xerrors.Wrap(errors.New("ydb: the transaction is rolling back"))
11+
errWrongNextResultSetIndex = errors.New("wrong result set index")
12+
errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index")
13+
errMoreThanOneRow = errors.New("unexpected more than one row in result set")
14+
errMoreThanOneResultSet = errors.New("unexpected more than one result set")
15+
errNoResultSets = errors.New("no result sets")
16+
errUnexpectedTxIDOnCommitFlag = errors.New("unexpected transaction ID on commit flag")
17+
errExpectedTxID = errors.New("expected transaction ID but nil")
18+
ErrOptionNotForTxExecute = errors.New("option is not for execute on transaction")
19+
errExecuteOnCompletedTx = errors.New("execute on completed transaction")
1720
)

internal/query/options/execute.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ type (
4242
applyExecuteOption(s *executeSettings)
4343
}
4444

45+
ExecuteNoTx interface {
46+
thisOptionIsNotForExecuteOnTx()
47+
}
48+
4549
// execute options
4650
callOptionsOption []grpc.CallOption
4751
txCommitOption struct{}
@@ -71,6 +75,8 @@ func (txControl *txControlOption) applyExecuteOption(s *executeSettings) {
7175
s.txControl = (*tx.Control)(txControl)
7276
}
7377

78+
func (txControl *txControlOption) thisOptionIsNotForExecuteOnTx() {}
79+
7480
func (syntax Syntax) applyExecuteOption(s *executeSettings) {
7581
s.syntax = syntax
7682
}

internal/query/transaction.go

Lines changed: 94 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package query
22

33
import (
44
"context"
5+
"fmt"
56

67
"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
78
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
@@ -23,13 +24,19 @@ var (
2324
_ baseTx.Transaction = (*Transaction)(nil)
2425
)
2526

27+
const (
28+
LazyTxID = "LAZY_TX"
29+
)
30+
2631
type (
2732
Transaction struct {
2833
baseTx.Identifier
2934

3035
s *Session
3136
txSettings query.TransactionSettings
3237

38+
completed bool
39+
3340
onCompleted xsync.Set[*baseTx.OnTransactionCompletedFunc]
3441
}
3542
)
@@ -77,12 +84,14 @@ func (tx *Transaction) QueryResultSet(
7784
onDone(finalErr)
7885
}()
7986

80-
settings := options.ExecuteSettings(
81-
append(
82-
[]options.Execute{options.WithTxControl(tx.txControl())},
83-
opts...,
84-
)...,
85-
)
87+
if tx.completed {
88+
return nil, xerrors.WithStackTrace(errExecuteOnCompletedTx)
89+
}
90+
91+
settings, err := tx.executeSettings(opts...)
92+
if err != nil {
93+
return nil, xerrors.WithStackTrace(err)
94+
}
8695

8796
resultOpts := []resultOption{
8897
withTrace(tx.s.cfg.Trace()),
@@ -101,7 +110,15 @@ func (tx *Transaction) QueryResultSet(
101110
return nil, xerrors.WithStackTrace(err)
102111
}
103112

104-
if tx.Identifier == nil {
113+
if settings.TxControl().Commit {
114+
if txID != nil {
115+
return nil, xerrors.WithStackTrace(errUnexpectedTxIDOnCommitFlag)
116+
}
117+
tx.completed = true
118+
} else if tx.Identifier == nil {
119+
if txID == nil {
120+
return nil, xerrors.WithStackTrace(errExpectedTxID)
121+
}
105122
tx.Identifier = txID
106123
}
107124

@@ -189,12 +206,14 @@ func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execu
189206
onDone(finalErr)
190207
}()
191208

192-
settings := options.ExecuteSettings(
193-
append(
194-
[]options.Execute{options.WithTxControl(tx.txControl())},
195-
opts...,
196-
)...,
197-
)
209+
if tx.completed {
210+
return xerrors.WithStackTrace(errExecuteOnCompletedTx)
211+
}
212+
213+
settings, err := tx.executeSettings(opts...)
214+
if err != nil {
215+
return xerrors.WithStackTrace(err)
216+
}
198217

199218
resultOpts := []resultOption{
200219
withTrace(tx.s.cfg.Trace()),
@@ -214,7 +233,15 @@ func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execu
214233
return xerrors.WithStackTrace(err)
215234
}
216235

217-
if tx.Identifier == nil {
236+
if settings.TxControl().Commit {
237+
if txID != nil {
238+
return xerrors.WithStackTrace(errUnexpectedTxIDOnCommitFlag)
239+
}
240+
tx.completed = true
241+
} else if tx.Identifier == nil {
242+
if txID == nil {
243+
return xerrors.WithStackTrace(errExpectedTxID)
244+
}
218245
tx.Identifier = txID
219246
}
220247

@@ -226,6 +253,37 @@ func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execu
226253
return nil
227254
}
228255

256+
func (tx *Transaction) executeSettings(opts ...options.Execute) (_ executeSettings, _ error) {
257+
for _, opt := range opts {
258+
if opt == nil {
259+
return nil, xerrors.WithStackTrace(errExpectedTxID)
260+
}
261+
if _, has := opt.(options.ExecuteNoTx); has {
262+
return nil, xerrors.WithStackTrace(
263+
fmt.Errorf("%T: %w", opt, ErrOptionNotForTxExecute),
264+
)
265+
}
266+
}
267+
268+
if tx.Identifier != nil {
269+
return options.ExecuteSettings(append([]options.Execute{
270+
options.WithTxControl(
271+
queryTx.NewControl(
272+
queryTx.WithTxID(tx.Identifier.ID()),
273+
),
274+
),
275+
}, opts...)...), nil
276+
}
277+
278+
return options.ExecuteSettings(append([]options.Execute{
279+
options.WithTxControl(
280+
queryTx.NewControl(
281+
queryTx.BeginTx(tx.txSettings...),
282+
),
283+
),
284+
}, opts...)...), nil
285+
}
286+
229287
func (tx *Transaction) Query(ctx context.Context, q string, opts ...options.Execute) (
230288
_ query.Result, finalErr error,
231289
) {
@@ -235,12 +293,14 @@ func (tx *Transaction) Query(ctx context.Context, q string, opts ...options.Exec
235293
onDone(finalErr)
236294
}()
237295

238-
settings := options.ExecuteSettings(
239-
append(
240-
[]options.Execute{options.WithTxControl(tx.txControl())},
241-
opts...,
242-
)...,
243-
)
296+
if tx.completed {
297+
return nil, xerrors.WithStackTrace(errExecuteOnCompletedTx)
298+
}
299+
300+
settings, err := tx.executeSettings(opts...)
301+
if err != nil {
302+
return nil, xerrors.WithStackTrace(err)
303+
}
244304

245305
resultOpts := []resultOption{
246306
withTrace(tx.s.cfg.Trace()),
@@ -259,7 +319,17 @@ func (tx *Transaction) Query(ctx context.Context, q string, opts ...options.Exec
259319
return nil, xerrors.WithStackTrace(err)
260320
}
261321

262-
if tx.Identifier == nil {
322+
if settings.TxControl().Commit {
323+
if txID != nil {
324+
return nil, xerrors.WithStackTrace(errUnexpectedTxIDOnCommitFlag)
325+
}
326+
tx.completed = true
327+
328+
return r, nil
329+
} else if tx.Identifier == nil {
330+
if txID == nil {
331+
return nil, xerrors.WithStackTrace(errExpectedTxID)
332+
}
263333
tx.Identifier = txID
264334
}
265335

@@ -281,6 +351,7 @@ func commitTx(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessi
281351
func (tx *Transaction) CommitTx(ctx context.Context) (err error) {
282352
defer func() {
283353
tx.notifyOnCompleted(err)
354+
tx.completed = true
284355
}()
285356

286357
if tx.Identifier == nil {
@@ -316,6 +387,8 @@ func (tx *Transaction) Rollback(ctx context.Context) error {
316387
return nil
317388
}
318389

390+
tx.completed = true
391+
319392
tx.notifyOnCompleted(ErrTransactionRollingBack)
320393

321394
err := rollback(ctx, tx.s.queryServiceClient, tx.s.id, tx.ID())

0 commit comments

Comments
 (0)