Skip to content

Commit 577ddef

Browse files
committed
lazy transactions: query.WithCommit() option + tests
1 parent 6e652e3 commit 577ddef

File tree

4 files changed

+109
-23
lines changed

4 files changed

+109
-23
lines changed

internal/query/transaction.go

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,10 @@ func (tx transaction) ID() string {
2222
return tx.id
2323
}
2424

25-
func fromTxOptions(txID string, txOpts ...query.TxExecuteOption) executeSettings {
26-
opts := make([]query.ExecuteOption, 0, len(txOpts)+1)
27-
for _, opt := range txOpts {
28-
if executeOpt, has := opt.(query.ExecuteOption); has {
29-
opts = append(opts, executeOpt)
30-
}
31-
}
32-
opts = append(opts, query.WithTxControl(query.TxControl(query.WithTxID(txID))))
33-
34-
return query.ExecuteSettings(opts...)
35-
}
36-
3725
func (tx transaction) Execute(ctx context.Context, q string, opts ...query.TxExecuteOption) (
3826
r query.Result, err error,
3927
) {
40-
_, res, err := execute(ctx, tx.s, tx.s.queryClient, q, fromTxOptions(tx.id, opts...))
28+
_, res, err := execute(ctx, tx.s, tx.s.queryClient, q, query.TxExecuteSettings(tx.id, opts...).ExecuteSettings)
4129
if err != nil {
4230
return nil, xerrors.WithStackTrace(err)
4331
}

internal/query/transaction_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func (s testExecuteSettings) CallOptions() []grpc.CallOption {
137137

138138
var _ executeSettings = testExecuteSettings{}
139139

140-
func TestFromTxOptions(t *testing.T) {
140+
func TestTxExecuteSettings(t *testing.T) {
141141
for _, tt := range []struct {
142142
name string
143143
txID string
@@ -224,7 +224,7 @@ func TestFromTxOptions(t *testing.T) {
224224
} {
225225
t.Run(tt.name, func(t *testing.T) {
226226
a := allocator.New()
227-
settings := fromTxOptions(tt.txID, tt.txOpts...)
227+
settings := query.TxExecuteSettings(tt.txID, tt.txOpts...).ExecuteSettings
228228
require.Equal(t, tt.settings.Syntax(), settings.Syntax())
229229
require.Equal(t, tt.settings.ExecMode(), settings.ExecMode())
230230
require.Equal(t, tt.settings.StatsMode(), settings.StatsMode())

query/execute_options.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,23 @@ type (
2828
applyExecuteOption(s *executeSettings)
2929
}
3030
txExecuteSettings struct {
31-
commonExecuteSettings
31+
ExecuteSettings *executeSettings
32+
33+
commitTx bool
3234
}
3335
TxExecuteOption interface {
3436
applyTxExecuteOption(s *txExecuteSettings)
3537
}
38+
txCommitOption struct{}
3639
parametersOption params.Parameters
3740
)
3841

42+
func (t txCommitOption) applyTxExecuteOption(s *txExecuteSettings) {
43+
s.commitTx = true
44+
}
45+
3946
func (syntax Syntax) applyTxExecuteOption(s *txExecuteSettings) {
40-
s.syntax = syntax
47+
syntax.applyExecuteOption(s.ExecuteSettings)
4148
}
4249

4350
func (syntax Syntax) applyExecuteOption(s *executeSettings) {
@@ -50,7 +57,7 @@ const (
5057
)
5158

5259
func (params parametersOption) applyTxExecuteOption(s *txExecuteSettings) {
53-
s.params = append(s.params, params...)
60+
params.applyExecuteOption(s.ExecuteSettings)
5461
}
5562

5663
func (params parametersOption) applyExecuteOption(s *executeSettings) {
@@ -62,19 +69,19 @@ func (opts callOptions) applyExecuteOption(s *executeSettings) {
6269
}
6370

6471
func (opts callOptions) applyTxExecuteOption(s *txExecuteSettings) {
65-
s.callOptions = append(s.callOptions, opts...)
72+
opts.applyExecuteOption(s.ExecuteSettings)
6673
}
6774

6875
func (mode StatsMode) applyTxExecuteOption(s *txExecuteSettings) {
69-
s.statsMode = mode
76+
mode.applyExecuteOption(s.ExecuteSettings)
7077
}
7178

7279
func (mode StatsMode) applyExecuteOption(s *executeSettings) {
7380
s.statsMode = mode
7481
}
7582

7683
func (mode ExecMode) applyTxExecuteOption(s *txExecuteSettings) {
77-
s.execMode = mode
84+
mode.applyExecuteOption(s.ExecuteSettings)
7885
}
7986

8087
func (mode ExecMode) applyExecuteOption(s *executeSettings) {
@@ -144,9 +151,9 @@ func (s *commonExecuteSettings) Params() *params.Parameters {
144151
return &s.params
145152
}
146153

147-
func TxExecuteSettings(opts ...TxExecuteOption) (settings *txExecuteSettings) {
154+
func TxExecuteSettings(id string, opts ...TxExecuteOption) (settings *txExecuteSettings) {
148155
settings = &txExecuteSettings{
149-
commonExecuteSettings: defaultCommonExecuteSettings(),
156+
ExecuteSettings: ExecuteSettings(WithTxControl(TxControl(WithTxID(id)))),
150157
}
151158
for _, opt := range opts {
152159
opt.applyTxExecuteOption(settings)
@@ -168,8 +175,13 @@ var (
168175
_ ExecuteOption = StatsMode(0)
169176
_ TxExecuteOption = ExecMode(0)
170177
_ TxExecuteOption = StatsMode(0)
178+
_ TxExecuteOption = txCommitOption{}
171179
)
172180

181+
func WithCommit() txCommitOption {
182+
return txCommitOption{}
183+
}
184+
173185
func WithExecMode(mode ExecMode) ExecMode {
174186
return mode
175187
}

tests/integration/query_execute_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package integration
55

66
import (
77
"context"
8+
"fmt"
89
"os"
910
"testing"
1011
"time"
@@ -168,4 +169,89 @@ func TestQueryExecute(t *testing.T) {
168169
require.EqualValues(t, time.Duration(100500000000), data.P3)
169170
require.Nil(t, data.P4)
170171
})
172+
t.Run("Transaction", func(t *testing.T) {
173+
t.Run("Explicit", func(t *testing.T) {
174+
err = db.Query().Do(ctx, func(ctx context.Context, s query.Session) (err error) {
175+
tx, err := s.Begin(ctx, query.TxSettings(query.WithSerializableReadWrite()))
176+
if err != nil {
177+
return err
178+
}
179+
res, err := tx.Execute(ctx, `SELECT 1`)
180+
if err != nil {
181+
return err
182+
}
183+
rs, err := res.NextResultSet(ctx)
184+
if err != nil {
185+
return err
186+
}
187+
row, err := rs.NextRow(ctx)
188+
if err != nil {
189+
return err
190+
}
191+
var v int32
192+
err = row.Scan(&v)
193+
if err != nil {
194+
return err
195+
}
196+
if v != 1 {
197+
return fmt.Errorf("unexpected value from database: %d", v)
198+
}
199+
if err = res.Err(); err != nil {
200+
return err
201+
}
202+
return tx.CommitTx(ctx)
203+
}, query.WithIdempotent())
204+
require.NoError(t, err)
205+
})
206+
t.Run("Lazy", func(t *testing.T) {
207+
err = db.Query().Do(ctx, func(ctx context.Context, s query.Session) (err error) {
208+
tx, res, err := s.Execute(ctx, `SELECT 1`,
209+
query.WithTxControl(query.TxControl(query.BeginTx(query.WithSerializableReadWrite()))),
210+
)
211+
if err != nil {
212+
return err
213+
}
214+
rs, err := res.NextResultSet(ctx)
215+
if err != nil {
216+
return err
217+
}
218+
row, err := rs.NextRow(ctx)
219+
if err != nil {
220+
return err
221+
}
222+
var v int32
223+
err = row.Scan(&v)
224+
if err != nil {
225+
return err
226+
}
227+
if v != 1 {
228+
return fmt.Errorf("unexpected value from database: %d", v)
229+
}
230+
if err = res.Err(); err != nil {
231+
return err
232+
}
233+
res, err = tx.Execute(ctx, `SELECT 2`, query.WithCommit())
234+
if err != nil {
235+
return err
236+
}
237+
rs, err = res.NextResultSet(ctx)
238+
if err != nil {
239+
return err
240+
}
241+
row, err = rs.NextRow(ctx)
242+
if err != nil {
243+
return err
244+
}
245+
err = row.Scan(&v)
246+
if err != nil {
247+
return err
248+
}
249+
if v != 2 {
250+
return fmt.Errorf("unexpected value from database: %d", v)
251+
}
252+
return res.Err()
253+
}, query.WithIdempotent())
254+
require.NoError(t, err)
255+
})
256+
})
171257
}

0 commit comments

Comments
 (0)