Skip to content

Commit 3ebc717

Browse files
authored
Merge pull request #1416 from ydb-platform/execute-idempotent
make option `query.WithIdempotent()` as option for `query.Client.{Exec,Query,QueryResultSet,QueryRow}`
2 parents 48ab697 + d1d51a3 commit 3ebc717

File tree

14 files changed

+102
-153
lines changed

14 files changed

+102
-153
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
* Removed experimental methods `query.Session.ReadResultSet` and `query.Session.ReadRows`
99
* Removed experimental methods `query.TxActor.ReadResultSet` and `query.TxActor.ReadRows`
1010
* Removed experimental method `query.Client.Stats`
11+
* Option `query.WithIdempotent()` allowed for `query.Client.{Exec,Query,QueryResultSet,QueryRow}` methods now
1112
* Added experimental support for operation service client through `db.Operation()` method (supports methods `Get`, `List`, `Cancel` and `Forget`)
1213

1314
## v3.76.6

example_test.go

Lines changed: 33 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
//go:build go1.23
2+
13
package ydb_test
24

35
import (
46
"context"
57
"database/sql"
6-
"errors"
78
"fmt"
89
"io"
910
"log"
@@ -28,61 +29,45 @@ func Example_query() {
2829
ctx := context.TODO()
2930
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
3031
if err != nil {
31-
log.Fatal(err)
32+
panic(err)
3233
}
3334
defer db.Close(ctx) // cleanup resources
3435

35-
err = db.Query().Do( // Do retry operation on errors with best effort
36+
materializedResult, err := db.Query().Query( // Do retry operation on errors with best effort
3637
ctx, // context manage exiting from Do
37-
func(ctx context.Context, s query.Session) (err error) { // retry operation
38-
streamResult, err := s.Query(ctx,
39-
`SELECT $id as myId, $str as myStr`,
40-
query.WithParameters(
41-
ydb.ParamsBuilder().
42-
Param("$id").Uint64(42).
43-
Param("$str").Text("my string").
44-
Build(),
45-
),
46-
)
38+
`SELECT $id as myId, $str as myStr`,
39+
query.WithParameters(
40+
ydb.ParamsBuilder().
41+
Param("$id").Uint64(42).
42+
Param("$str").Text("my string").
43+
Build(),
44+
),
45+
query.WithIdempotent(),
46+
)
47+
if err != nil {
48+
panic(err)
49+
}
50+
defer func() { _ = materializedResult.Close(ctx) }() // cleanup resources
51+
for rs, err := range materializedResult.ResultSets(ctx) {
52+
if err != nil {
53+
panic(err)
54+
}
55+
for row, err := range rs.Rows(ctx) {
4756
if err != nil {
48-
return err // for auto-retry with driver
57+
panic(err)
4958
}
50-
defer func() { _ = streamResult.Close(ctx) }() // cleanup resources
51-
for { // iterate over result sets
52-
rs, err := streamResult.NextResultSet(ctx)
53-
if err != nil {
54-
if errors.Is(err, io.EOF) {
55-
break
56-
}
57-
58-
return err
59-
}
60-
for { // iterate over rows
61-
row, err := rs.NextRow(ctx)
62-
if err != nil {
63-
if errors.Is(err, io.EOF) {
64-
break
65-
}
66-
67-
return err
68-
}
69-
type myStruct struct {
70-
ID uint64 `sql:"id"`
71-
Str string `sql:"myStr"`
72-
}
73-
var s myStruct
74-
if err = row.ScanStruct(&s); err != nil {
75-
return err // generally scan error not retryable, return it for driver check error
76-
}
77-
}
59+
type myStruct struct {
60+
ID uint64 `sql:"id"`
61+
Str string `sql:"myStr"`
7862
}
79-
80-
return nil
81-
},
82-
query.WithIdempotent(),
83-
)
63+
var s myStruct
64+
if err = row.ScanStruct(&s); err != nil {
65+
panic(err)
66+
}
67+
}
68+
}
8469
if err != nil {
85-
log.Printf("unexpected error: %v", err)
70+
panic(err)
8671
}
8772
}
8873

internal/query/client.go

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,10 @@ func do(
322322

323323
err := op(ctx, s)
324324
if err != nil {
325+
if xerrors.IsOperationError(err) {
326+
s.setStatus(statusClosed)
327+
}
328+
325329
return xerrors.WithStackTrace(err)
326330
}
327331

@@ -411,7 +415,7 @@ func clientQueryRow(
411415
}
412416

413417
return nil
414-
})
418+
}, settings.RetryOpts()...)
415419
if err != nil {
416420
return nil, xerrors.WithStackTrace(err)
417421
}
@@ -441,14 +445,20 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute
441445
}
442446

443447
func clientExec(ctx context.Context, pool sessionPool, q string, opts ...options.Execute) (finalErr error) {
448+
settings := options.ExecuteSettings(opts...)
444449
err := do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
445-
err = s.Exec(ctx, q, opts...)
450+
_, r, err := execute(ctx, s.id, s.queryServiceClient, q, settings, withTrace(s.cfg.Trace()))
451+
if err != nil {
452+
return xerrors.WithStackTrace(err)
453+
}
454+
455+
err = readAll(ctx, r)
446456
if err != nil {
447457
return xerrors.WithStackTrace(err)
448458
}
449459

450460
return nil
451-
})
461+
}, settings.RetryOpts()...)
452462
if err != nil {
453463
return xerrors.WithStackTrace(err)
454464
}
@@ -479,8 +489,15 @@ func (c *Client) Exec(ctx context.Context, q string, opts ...options.Execute) (f
479489
func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...options.Execute) (
480490
r query.Result, err error,
481491
) {
492+
settings := options.ExecuteSettings(opts...)
482493
err = do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
483-
streamResult, err := s.Query(ctx, q, opts...)
494+
_, streamResult, err := execute(ctx, s.id, s.queryServiceClient, q,
495+
options.ExecuteSettings(opts...), withTrace(s.cfg.Trace()),
496+
)
497+
if err != nil {
498+
return xerrors.WithStackTrace(err)
499+
}
500+
484501
if err != nil {
485502
return xerrors.WithStackTrace(err)
486503
}
@@ -494,7 +511,7 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option
494511
}
495512

496513
return nil
497-
})
514+
}, settings.RetryOpts()...)
498515
if err != nil {
499516
return nil, xerrors.WithStackTrace(err)
500517
}
@@ -528,24 +545,16 @@ func clientQueryResultSet(
528545
err := do(ctx, pool, func(ctx context.Context, s *Session) error {
529546
_, r, err := execute(ctx, s.id, s.queryServiceClient, q, settings, resultOpts...)
530547
if err != nil {
531-
if xerrors.IsOperationError(err) {
532-
s.setStatus(statusClosed)
533-
}
534-
535548
return xerrors.WithStackTrace(err)
536549
}
537550

538551
rs, err = readMaterializedResultSet(ctx, r)
539552
if err != nil {
540-
if xerrors.IsOperationError(err) {
541-
s.setStatus(statusClosed)
542-
}
543-
544553
return xerrors.WithStackTrace(err)
545554
}
546555

547556
return nil
548-
})
557+
}, settings.RetryOpts()...)
549558
if err != nil {
550559
return nil, xerrors.WithStackTrace(err)
551560
}

internal/query/execute_query.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
1919
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
2020
"github.com/ydb-platform/ydb-go-sdk/v3/query"
21+
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
2122
"github.com/ydb-platform/ydb-go-sdk/v3/table/stats"
2223
)
2324

@@ -29,6 +30,7 @@ type executeSettings interface {
2930
Syntax() options.Syntax
3031
Params() *params.Parameters
3132
CallOptions() []grpc.CallOption
33+
RetryOpts() []retry.Option
3234
}
3335

3436
type executeScriptConfig interface {

internal/query/options/execute.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/ydb-platform/ydb-go-sdk/v3/internal/params"
88
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/tx"
99
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stats"
10+
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
1011
)
1112

1213
var (
@@ -33,6 +34,7 @@ type (
3334
statsCallback func(queryStats stats.QueryStats)
3435
callOptions []grpc.CallOption
3536
txControl *tx.Control
37+
retryOptions []retry.Option
3638
}
3739

3840
// Execute is an interface for execute method options
@@ -53,6 +55,10 @@ type (
5355
execModeOption = ExecMode
5456
)
5557

58+
func (s *executeSettings) RetryOpts() []retry.Option {
59+
return s.retryOptions
60+
}
61+
5662
func (s *executeSettings) StatsCallback() func(stats.QueryStats) {
5763
return s.statsCallback
5864
}

internal/query/options/retry.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ type (
4444
}
4545
)
4646

47+
func (opts RetryOptionsOption) applyExecuteOption(s *executeSettings) {
48+
s.retryOptions = append(s.retryOptions, opts...)
49+
}
50+
4751
func (s *doSettings) Trace() *trace.Query {
4852
return s.trace
4953
}

internal/query/session.go

Lines changed: 3 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -39,26 +39,13 @@ func (s *Session) QueryResultSet(
3939
onDone(finalErr)
4040
}()
4141

42-
settings := options.ExecuteSettings(opts...)
43-
44-
resultOpts := []resultOption{
45-
withTrace(s.cfg.Trace()),
46-
}
47-
_, r, err := execute(ctx, s.id, s.queryServiceClient, q, settings, resultOpts...)
42+
_, r, err := execute(ctx, s.id, s.queryServiceClient, q, options.ExecuteSettings(opts...), withTrace(s.cfg.Trace()))
4843
if err != nil {
49-
if xerrors.IsOperationError(err) {
50-
s.setStatus(statusClosed)
51-
}
52-
5344
return nil, xerrors.WithStackTrace(err)
5445
}
5546

5647
rs, err = readResultSet(ctx, r)
5748
if err != nil {
58-
if xerrors.IsOperationError(err) {
59-
s.setStatus(statusClosed)
60-
}
61-
6249
return nil, xerrors.WithStackTrace(err)
6350
}
6451

@@ -70,19 +57,11 @@ func (s *Session) queryRow(
7057
) (row query.Row, finalErr error) {
7158
_, r, err := execute(ctx, s.id, s.queryServiceClient, q, settings, resultOpts...)
7259
if err != nil {
73-
if xerrors.IsOperationError(err) {
74-
s.setStatus(statusClosed)
75-
}
76-
7760
return nil, xerrors.WithStackTrace(err)
7861
}
7962

8063
row, err = readRow(ctx, r)
8164
if err != nil {
82-
if xerrors.IsOperationError(err) {
83-
s.setStatus(statusClosed)
84-
}
85-
8665
return nil, xerrors.WithStackTrace(err)
8766
}
8867

@@ -98,10 +77,6 @@ func (s *Session) QueryRow(ctx context.Context, q string, opts ...options.Execut
9877

9978
row, err := s.queryRow(ctx, q, options.ExecuteSettings(opts...), withTrace(s.cfg.Trace()))
10079
if err != nil {
101-
if xerrors.IsOperationError(err) {
102-
s.setStatus(statusClosed)
103-
}
104-
10580
return nil, xerrors.WithStackTrace(err)
10681
}
10782

@@ -316,27 +291,13 @@ func (s *Session) Exec(
316291
onDone(finalErr)
317292
}()
318293

319-
settings := options.ExecuteSettings(opts...)
320-
321-
resultOpts := []resultOption{
322-
withTrace(s.cfg.Trace()),
323-
}
324-
325-
_, r, err := execute(ctx, s.id, s.queryServiceClient, q, settings, resultOpts...)
294+
_, r, err := execute(ctx, s.id, s.queryServiceClient, q, options.ExecuteSettings(opts...), withTrace(s.cfg.Trace()))
326295
if err != nil {
327-
if xerrors.IsOperationError(err) {
328-
s.setStatus(statusClosed)
329-
}
330-
331296
return xerrors.WithStackTrace(err)
332297
}
333298

334299
err = readAll(ctx, r)
335300
if err != nil {
336-
if xerrors.IsOperationError(err) {
337-
s.setStatus(statusClosed)
338-
}
339-
340301
return xerrors.WithStackTrace(err)
341302
}
342303

@@ -352,17 +313,8 @@ func (s *Session) Query(
352313
onDone(finalErr)
353314
}()
354315

355-
settings := options.ExecuteSettings(opts...)
356-
357-
resultOpts := []resultOption{
358-
withTrace(s.cfg.Trace()),
359-
}
360-
_, r, err := execute(ctx, s.id, s.queryServiceClient, q, settings, resultOpts...)
316+
_, r, err := execute(ctx, s.id, s.queryServiceClient, q, options.ExecuteSettings(opts...), withTrace(s.cfg.Trace()))
361317
if err != nil {
362-
if xerrors.IsOperationError(err) {
363-
s.setStatus(statusClosed)
364-
}
365-
366318
return nil, xerrors.WithStackTrace(err)
367319
}
368320

0 commit comments

Comments
 (0)