Skip to content

Commit 7218f4e

Browse files
authored
Merge pull request #1117 from ydb-platform/query-logs
logging for `query.{Do,DoTx}` + refactoring of `query.{Do,DoTx}Options`
2 parents d343bc5 + 7bb2d7c commit 7218f4e

36 files changed

+903
-490
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
* Added logs over query service internals
2+
* Changed `trace.Query` events
3+
* Changed visibility of `query.{Do,DoTx}Options` from public to private
4+
15
## v3.57.0
26
* Added experimental implementation of query service client
37
* Fixed sometime panic on topic writer closing

internal/conn/grpc_client_stream.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,10 @@ func (s *grpcClientStream) wrapError(err error) error {
135135
return nil
136136
}
137137

138-
nodeErr := newConnError(s.c.endpoint.NodeID(), s.c.endpoint.Address(), err)
139-
140-
return xerrors.WithStackTrace(nodeErr, xerrors.WithSkipDepth(1))
138+
return xerrors.WithStackTrace(
139+
newConnError(s.c.endpoint.NodeID(), s.c.endpoint.Address(), err),
140+
xerrors.WithSkipDepth(1),
141+
)
141142
}
142143

143144
func createPinger(c *conn) context.CancelFunc {

internal/pool/pool.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,15 +107,15 @@ func (p *Pool[T]) try(ctx context.Context, f func(ctx context.Context, item *T)
107107
return nil
108108
}
109109

110-
func (p *Pool[T]) With(ctx context.Context, f func(ctx context.Context, item *T) error) error {
110+
func (p *Pool[T]) With(ctx context.Context, f func(ctx context.Context, item *T) error, opts ...retry.Option) error {
111111
err := retry.Retry(ctx, func(ctx context.Context) error {
112112
err := p.try(ctx, f)
113113
if err != nil {
114114
return xerrors.WithStackTrace(err)
115115
}
116116

117117
return nil
118-
})
118+
}, opts...)
119119
if err != nil {
120120
return xerrors.WithStackTrace(err)
121121
}

internal/query/client.go

Lines changed: 61 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,13 @@ import (
1212

1313
"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
1414
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
15+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
16+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
1517
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
1618
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1719
"github.com/ydb-platform/ydb-go-sdk/v3/query"
1820
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
21+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
1922
)
2023

2124
//go:generate mockgen -destination grpc_client_mock_test.go -package query -write_package_comment=false github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1 QueryServiceClient,QueryService_AttachSessionClient,QueryService_ExecuteQueryClient
@@ -27,6 +30,7 @@ type balancer interface {
2730
var _ query.Client = (*Client)(nil)
2831

2932
type Client struct {
33+
config *config.Config
3034
grpcClient Ydb_Query_V1.QueryServiceClient
3135
pool *pool.Pool[Session]
3236
}
@@ -40,39 +44,63 @@ func (c Client) Close(ctx context.Context) error {
4044
return nil
4145
}
4246

43-
func do(ctx context.Context, pool *pool.Pool[Session], op query.Operation, opts *query.DoOptions) error {
44-
return retry.Retry(ctx, func(ctx context.Context) error {
45-
err := pool.With(ctx, func(ctx context.Context, s *Session) error {
46-
err := op(ctx, s)
47-
if err != nil {
48-
return xerrors.WithStackTrace(err)
49-
}
50-
51-
return nil
52-
})
47+
func do(
48+
ctx context.Context,
49+
pool *pool.Pool[Session],
50+
op query.Operation,
51+
t *trace.Query,
52+
opts ...options.DoOption,
53+
) (finalErr error) {
54+
doOpts := options.ParseDoOpts(t, opts...)
55+
56+
err := pool.With(ctx, func(ctx context.Context, s *Session) error {
57+
err := op(ctx, s)
5358
if err != nil {
5459
return xerrors.WithStackTrace(err)
5560
}
5661

5762
return nil
58-
}, opts.RetryOptions...)
59-
}
60-
61-
func (c Client) Do(ctx context.Context, op query.Operation, opts ...query.DoOption) error {
62-
doOptions := query.NewDoOptions(opts...)
63-
if doOptions.Label != "" {
64-
doOptions.RetryOptions = append(doOptions.RetryOptions, retry.WithLabel(doOptions.Label))
65-
}
66-
if doOptions.Idempotent {
67-
doOptions.RetryOptions = append(doOptions.RetryOptions, retry.WithIdempotent(doOptions.Idempotent))
63+
}, append(doOpts.RetryOpts(), retry.WithTrace(&trace.Retry{
64+
OnRetry: func(
65+
info trace.RetryLoopStartInfo,
66+
) func(
67+
trace.RetryLoopIntermediateInfo,
68+
) func(
69+
trace.RetryLoopDoneInfo,
70+
) {
71+
onIntermediate := trace.QueryOnDo(doOpts.Trace(), &ctx, stack.FunctionID(""))
72+
73+
return func(info trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) {
74+
onDone := onIntermediate(info.Error)
75+
76+
return func(info trace.RetryLoopDoneInfo) {
77+
onDone(info.Attempts, info.Error)
78+
}
79+
}
80+
},
81+
}))...)
82+
if err != nil {
83+
return xerrors.WithStackTrace(err)
6884
}
6985

70-
return do(ctx, c.pool, op, &doOptions)
86+
return nil
87+
}
88+
89+
func (c Client) Do(ctx context.Context, op query.Operation, opts ...options.DoOption) error {
90+
return do(ctx, c.pool, op, c.config.Trace(), opts...)
7191
}
7292

73-
func doTx(ctx context.Context, pool *pool.Pool[Session], op query.TxOperation, opts *query.DoTxOptions) error {
74-
return do(ctx, pool, func(ctx context.Context, s query.Session) error {
75-
tx, err := s.Begin(ctx, opts.TxSettings)
93+
func doTx(
94+
ctx context.Context,
95+
pool *pool.Pool[Session],
96+
op query.TxOperation,
97+
t *trace.Query,
98+
opts ...options.DoTxOption,
99+
) error {
100+
doTxOpts := options.ParseDoTxOpts(t, opts...)
101+
102+
err := do(ctx, pool, func(ctx context.Context, s query.Session) error {
103+
tx, err := s.Begin(ctx, doTxOpts.TxSettings())
76104
if err != nil {
77105
return xerrors.WithStackTrace(err)
78106
}
@@ -96,19 +124,16 @@ func doTx(ctx context.Context, pool *pool.Pool[Session], op query.TxOperation, o
96124
}
97125

98126
return nil
99-
}, &opts.DoOptions)
100-
}
101-
102-
func (c Client) DoTx(ctx context.Context, op query.TxOperation, opts ...query.DoTxOption) error {
103-
doTxOptions := query.NewDoTxOptions(opts...)
104-
if doTxOptions.Label != "" {
105-
doTxOptions.RetryOptions = append(doTxOptions.RetryOptions, retry.WithLabel(doTxOptions.Label))
106-
}
107-
if doTxOptions.Idempotent {
108-
doTxOptions.RetryOptions = append(doTxOptions.RetryOptions, retry.WithIdempotent(doTxOptions.Idempotent))
127+
}, t, doTxOpts.DoOpts()...)
128+
if err != nil {
129+
return xerrors.WithStackTrace(err)
109130
}
110131

111-
return doTx(ctx, c.pool, op, &doTxOptions)
132+
return nil
133+
}
134+
135+
func (c Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options.DoTxOption) error {
136+
return doTx(ctx, c.pool, op, c.config.Trace(), opts...)
112137
}
113138

114139
func deleteSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessionID string) error {
@@ -231,6 +256,7 @@ func createSession(
231256

232257
func New(ctx context.Context, balancer balancer, config *config.Config) (*Client, error) {
233258
client := &Client{
259+
config: config,
234260
grpcClient: Ydb_Query_V1.NewQueryServiceClient(balancer),
235261
}
236262

internal/query/client_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1919
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
2020
"github.com/ydb-platform/ydb-go-sdk/v3/query"
21+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2122
)
2223

2324
func TestCreateSession(t *testing.T) {
@@ -182,7 +183,7 @@ func TestDo(t *testing.T) {
182183
return newTestSession()
183184
}), func(ctx context.Context, s query.Session) error {
184185
return nil
185-
}, &query.DoOptions{})
186+
}, &trace.Query{})
186187
require.NoError(t, err)
187188
})
188189
t.Run("RetryableError", func(t *testing.T) {
@@ -196,7 +197,7 @@ func TestDo(t *testing.T) {
196197
}
197198

198199
return nil
199-
}, &query.DoOptions{})
200+
}, &trace.Query{})
200201
require.NoError(t, err)
201202
require.Equal(t, 10, counter)
202203
})
@@ -217,7 +218,7 @@ func TestDoTx(t *testing.T) {
217218
return newTestSessionWithClient(client)
218219
}), func(ctx context.Context, tx query.TxActor) error {
219220
return nil
220-
}, &query.DoTxOptions{})
221+
}, &trace.Query{})
221222
require.NoError(t, err)
222223
})
223224
t.Run("RetryableError", func(t *testing.T) {
@@ -242,7 +243,7 @@ func TestDoTx(t *testing.T) {
242243
}
243244

244245
return nil
245-
}, &query.DoTxOptions{})
246+
}, &trace.Query{})
246247
require.NoError(t, err)
247248
require.Equal(t, 10, counter)
248249
})

internal/query/config/options.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"time"
55

66
"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
7+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
78
)
89

910
type Option func(*Config)
@@ -15,6 +16,13 @@ func With(config config.Common) Option {
1516
}
1617
}
1718

19+
// WithTrace appends table trace to early defined traces
20+
func WithTrace(trace *trace.Query, opts ...trace.QueryComposeOption) Option {
21+
return func(c *Config) {
22+
c.trace = c.trace.Compose(trace, opts...)
23+
}
24+
}
25+
1826
// WithSizeLimit defines upper bound of pooled sessions.
1927
// If sizeLimit is less than or equal to zero then the
2028
// DefaultPoolMaxSize variable is used as a limit.

internal/query/errors.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
var (
88
ErrNotImplemented = errors.New("not implemented yet")
99
errWrongNextResultSetIndex = errors.New("wrong result set index")
10-
errInterruptedStream = errors.New("interrupted stream")
1110
errClosedResult = errors.New("result closed early")
1211
errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index")
1312
)

internal/query/execute_query.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,17 @@ import (
99

1010
"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
1111
"github.com/ydb-platform/ydb-go-sdk/v3/internal/params"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
1314
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1415
"github.com/ydb-platform/ydb-go-sdk/v3/query"
1516
)
1617

1718
type executeConfig interface {
18-
ExecMode() query.ExecMode
19-
StatsMode() query.StatsMode
19+
ExecMode() options.ExecMode
20+
StatsMode() options.StatsMode
2021
TxControl() *query.TransactionControl
21-
Syntax() query.Syntax
22+
Syntax() options.Syntax
2223
Params() *params.Parameters
2324
CallOptions() []grpc.CallOption
2425
}

0 commit comments

Comments
 (0)