@@ -2,6 +2,7 @@ package query
22
33import (
44 "context"
5+ "fmt"
56
67 "github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
78 "github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
@@ -23,13 +24,19 @@ var (
2324 _ baseTx.Transaction = (* Transaction )(nil )
2425)
2526
27+ const (
28+ LazyTxID = "LAZY_TX"
29+ )
30+
2631type (
2732 Transaction struct {
2833 baseTx.Identifier
2934
3035 s * Session
3136 txSettings query.TransactionSettings
3237
38+ completed bool
39+
3340 onCompleted xsync.Set [* baseTx.OnTransactionCompletedFunc ]
3441 }
3542)
@@ -77,12 +84,14 @@ func (tx *Transaction) QueryResultSet(
7784 onDone (finalErr )
7885 }()
7986
80- settings := options .ExecuteSettings (
81- append (
82- []options.Execute {options .WithTxControl (tx .txControl ())},
83- opts ... ,
84- )... ,
85- )
87+ if tx .completed {
88+ return nil , xerrors .WithStackTrace (errExecuteOnCompletedTx )
89+ }
90+
91+ settings , err := tx .executeSettings (opts ... )
92+ if err != nil {
93+ return nil , xerrors .WithStackTrace (err )
94+ }
8695
8796 resultOpts := []resultOption {
8897 withTrace (tx .s .cfg .Trace ()),
@@ -101,7 +110,15 @@ func (tx *Transaction) QueryResultSet(
101110 return nil , xerrors .WithStackTrace (err )
102111 }
103112
104- if tx .Identifier == nil {
113+ if settings .TxControl ().Commit {
114+ if txID != nil {
115+ return nil , xerrors .WithStackTrace (errUnexpectedTxIDOnCommitFlag )
116+ }
117+ tx .completed = true
118+ } else if tx .Identifier == nil {
119+ if txID == nil {
120+ return nil , xerrors .WithStackTrace (errExpectedTxID )
121+ }
105122 tx .Identifier = txID
106123 }
107124
@@ -189,12 +206,14 @@ func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execu
189206 onDone (finalErr )
190207 }()
191208
192- settings := options .ExecuteSettings (
193- append (
194- []options.Execute {options .WithTxControl (tx .txControl ())},
195- opts ... ,
196- )... ,
197- )
209+ if tx .completed {
210+ return xerrors .WithStackTrace (errExecuteOnCompletedTx )
211+ }
212+
213+ settings , err := tx .executeSettings (opts ... )
214+ if err != nil {
215+ return xerrors .WithStackTrace (err )
216+ }
198217
199218 resultOpts := []resultOption {
200219 withTrace (tx .s .cfg .Trace ()),
@@ -214,7 +233,15 @@ func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execu
214233 return xerrors .WithStackTrace (err )
215234 }
216235
217- if tx .Identifier == nil {
236+ if settings .TxControl ().Commit {
237+ if txID != nil {
238+ return xerrors .WithStackTrace (errUnexpectedTxIDOnCommitFlag )
239+ }
240+ tx .completed = true
241+ } else if tx .Identifier == nil {
242+ if txID == nil {
243+ return xerrors .WithStackTrace (errExpectedTxID )
244+ }
218245 tx .Identifier = txID
219246 }
220247
@@ -226,6 +253,37 @@ func (tx *Transaction) Exec(ctx context.Context, q string, opts ...options.Execu
226253 return nil
227254}
228255
256+ func (tx * Transaction ) executeSettings (opts ... options.Execute ) (_ executeSettings , _ error ) {
257+ for _ , opt := range opts {
258+ if opt == nil {
259+ return nil , xerrors .WithStackTrace (errExpectedTxID )
260+ }
261+ if _ , has := opt .(options.ExecuteNoTx ); has {
262+ return nil , xerrors .WithStackTrace (
263+ fmt .Errorf ("%T: %w" , opt , ErrOptionNotForTxExecute ),
264+ )
265+ }
266+ }
267+
268+ if tx .Identifier != nil {
269+ return options .ExecuteSettings (append ([]options.Execute {
270+ options .WithTxControl (
271+ queryTx .NewControl (
272+ queryTx .WithTxID (tx .Identifier .ID ()),
273+ ),
274+ ),
275+ }, opts ... )... ), nil
276+ }
277+
278+ return options .ExecuteSettings (append ([]options.Execute {
279+ options .WithTxControl (
280+ queryTx .NewControl (
281+ queryTx .BeginTx (tx .txSettings ... ),
282+ ),
283+ ),
284+ }, opts ... )... ), nil
285+ }
286+
229287func (tx * Transaction ) Query (ctx context.Context , q string , opts ... options.Execute ) (
230288 _ query.Result , finalErr error ,
231289) {
@@ -235,12 +293,14 @@ func (tx *Transaction) Query(ctx context.Context, q string, opts ...options.Exec
235293 onDone (finalErr )
236294 }()
237295
238- settings := options .ExecuteSettings (
239- append (
240- []options.Execute {options .WithTxControl (tx .txControl ())},
241- opts ... ,
242- )... ,
243- )
296+ if tx .completed {
297+ return nil , xerrors .WithStackTrace (errExecuteOnCompletedTx )
298+ }
299+
300+ settings , err := tx .executeSettings (opts ... )
301+ if err != nil {
302+ return nil , xerrors .WithStackTrace (err )
303+ }
244304
245305 resultOpts := []resultOption {
246306 withTrace (tx .s .cfg .Trace ()),
@@ -259,7 +319,17 @@ func (tx *Transaction) Query(ctx context.Context, q string, opts ...options.Exec
259319 return nil , xerrors .WithStackTrace (err )
260320 }
261321
262- if tx .Identifier == nil {
322+ if settings .TxControl ().Commit {
323+ if txID != nil {
324+ return nil , xerrors .WithStackTrace (errUnexpectedTxIDOnCommitFlag )
325+ }
326+ tx .completed = true
327+
328+ return r , nil
329+ } else if tx .Identifier == nil {
330+ if txID == nil {
331+ return nil , xerrors .WithStackTrace (errExpectedTxID )
332+ }
263333 tx .Identifier = txID
264334 }
265335
@@ -281,6 +351,7 @@ func commitTx(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessi
281351func (tx * Transaction ) CommitTx (ctx context.Context ) (err error ) {
282352 defer func () {
283353 tx .notifyOnCompleted (err )
354+ tx .completed = true
284355 }()
285356
286357 if tx .Identifier == nil {
@@ -316,6 +387,8 @@ func (tx *Transaction) Rollback(ctx context.Context) error {
316387 return nil
317388 }
318389
390+ tx .completed = true
391+
319392 tx .notifyOnCompleted (ErrTransactionRollingBack )
320393
321394 err := rollback (ctx , tx .s .queryServiceClient , tx .s .id , tx .ID ())
0 commit comments