Skip to content

Commit 6f514c4

Browse files
committed
logging for query.{Do,DoTx} + fixes of query.{Do,DoTx}Options
1 parent d343bc5 commit 6f514c4

25 files changed

+521
-301
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
* Added logs over query service internals
2+
* Changed `trace.Query` events
3+
* Changed visibility of `query.{Do,DoTx}Options` from public to private
4+
15
## v3.57.0
26
* Added experimental implementation of query service client
37
* Fixed sometime panic on topic writer closing

internal/conn/grpc_client_stream.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,10 @@ func (s *grpcClientStream) wrapError(err error) error {
135135
return nil
136136
}
137137

138-
nodeErr := newConnError(s.c.endpoint.NodeID(), s.c.endpoint.Address(), err)
139-
140-
return xerrors.WithStackTrace(nodeErr, xerrors.WithSkipDepth(1))
138+
return xerrors.WithStackTrace(
139+
newConnError(s.c.endpoint.NodeID(), s.c.endpoint.Address(), err),
140+
xerrors.WithSkipDepth(1),
141+
)
141142
}
142143

143144
func createPinger(c *conn) context.CancelFunc {

internal/pool/pool.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,15 +107,15 @@ func (p *Pool[T]) try(ctx context.Context, f func(ctx context.Context, item *T)
107107
return nil
108108
}
109109

110-
func (p *Pool[T]) With(ctx context.Context, f func(ctx context.Context, item *T) error) error {
110+
func (p *Pool[T]) With(ctx context.Context, f func(ctx context.Context, item *T) error, opts ...retry.Option) error {
111111
err := retry.Retry(ctx, func(ctx context.Context) error {
112112
err := p.try(ctx, f)
113113
if err != nil {
114114
return xerrors.WithStackTrace(err)
115115
}
116116

117117
return nil
118-
})
118+
}, opts...)
119119
if err != nil {
120120
return xerrors.WithStackTrace(err)
121121
}

internal/query/client.go

Lines changed: 58 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ import (
1212

1313
"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
1414
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
15+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
1516
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
1617
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1718
"github.com/ydb-platform/ydb-go-sdk/v3/query"
1819
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
20+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
1921
)
2022

2123
//go:generate mockgen -destination grpc_client_mock_test.go -package query -write_package_comment=false github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1 QueryServiceClient,QueryService_AttachSessionClient,QueryService_ExecuteQueryClient
@@ -27,6 +29,7 @@ type balancer interface {
2729
var _ query.Client = (*Client)(nil)
2830

2931
type Client struct {
32+
config *config.Config
3033
grpcClient Ydb_Query_V1.QueryServiceClient
3134
pool *pool.Pool[Session]
3235
}
@@ -40,39 +43,63 @@ func (c Client) Close(ctx context.Context) error {
4043
return nil
4144
}
4245

43-
func do(ctx context.Context, pool *pool.Pool[Session], op query.Operation, opts *query.DoOptions) error {
44-
return retry.Retry(ctx, func(ctx context.Context) error {
45-
err := pool.With(ctx, func(ctx context.Context, s *Session) error {
46-
err := op(ctx, s)
47-
if err != nil {
48-
return xerrors.WithStackTrace(err)
49-
}
50-
51-
return nil
52-
})
46+
func do(
47+
ctx context.Context,
48+
pool *pool.Pool[Session],
49+
op query.Operation,
50+
t *trace.Query,
51+
opts ...query.DoOption,
52+
) (finalErr error) {
53+
doOpts := query.ParseDoOpts(t, opts...)
54+
55+
err := pool.With(ctx, func(ctx context.Context, s *Session) error {
56+
err := op(ctx, s)
5357
if err != nil {
5458
return xerrors.WithStackTrace(err)
5559
}
5660

5761
return nil
58-
}, opts.RetryOptions...)
62+
}, append(doOpts.RetryOpts(), retry.WithTrace(&trace.Retry{
63+
OnRetry: func(
64+
info trace.RetryLoopStartInfo,
65+
) func(
66+
trace.RetryLoopIntermediateInfo,
67+
) func(
68+
trace.RetryLoopDoneInfo,
69+
) {
70+
onIntermediate := trace.QueryOnDo(doOpts.Trace(), &ctx, stack.FunctionID(""))
71+
72+
return func(info trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) {
73+
onDone := onIntermediate(info.Error)
74+
75+
return func(info trace.RetryLoopDoneInfo) {
76+
onDone(info.Attempts, info.Error)
77+
}
78+
}
79+
},
80+
}))...)
81+
if err != nil {
82+
return xerrors.WithStackTrace(err)
83+
}
84+
85+
return nil
5986
}
6087

6188
func (c Client) Do(ctx context.Context, op query.Operation, opts ...query.DoOption) error {
62-
doOptions := query.NewDoOptions(opts...)
63-
if doOptions.Label != "" {
64-
doOptions.RetryOptions = append(doOptions.RetryOptions, retry.WithLabel(doOptions.Label))
65-
}
66-
if doOptions.Idempotent {
67-
doOptions.RetryOptions = append(doOptions.RetryOptions, retry.WithIdempotent(doOptions.Idempotent))
68-
}
69-
70-
return do(ctx, c.pool, op, &doOptions)
89+
return do(ctx, c.pool, op, c.config.Trace(), opts...)
7190
}
7291

73-
func doTx(ctx context.Context, pool *pool.Pool[Session], op query.TxOperation, opts *query.DoTxOptions) error {
74-
return do(ctx, pool, func(ctx context.Context, s query.Session) error {
75-
tx, err := s.Begin(ctx, opts.TxSettings)
92+
func doTx(
93+
ctx context.Context,
94+
pool *pool.Pool[Session],
95+
op query.TxOperation,
96+
t *trace.Query,
97+
opts ...query.DoTxOption,
98+
) error {
99+
doTxOpts := query.ParseDoTxOpts(t, opts...)
100+
101+
err := do(ctx, pool, func(ctx context.Context, s query.Session) error {
102+
tx, err := s.Begin(ctx, doTxOpts.TxSettings())
76103
if err != nil {
77104
return xerrors.WithStackTrace(err)
78105
}
@@ -96,19 +123,16 @@ func doTx(ctx context.Context, pool *pool.Pool[Session], op query.TxOperation, o
96123
}
97124

98125
return nil
99-
}, &opts.DoOptions)
126+
}, t, doTxOpts.DoOpts()...)
127+
if err != nil {
128+
return xerrors.WithStackTrace(err)
129+
}
130+
131+
return nil
100132
}
101133

102134
func (c Client) DoTx(ctx context.Context, op query.TxOperation, opts ...query.DoTxOption) error {
103-
doTxOptions := query.NewDoTxOptions(opts...)
104-
if doTxOptions.Label != "" {
105-
doTxOptions.RetryOptions = append(doTxOptions.RetryOptions, retry.WithLabel(doTxOptions.Label))
106-
}
107-
if doTxOptions.Idempotent {
108-
doTxOptions.RetryOptions = append(doTxOptions.RetryOptions, retry.WithIdempotent(doTxOptions.Idempotent))
109-
}
110-
111-
return doTx(ctx, c.pool, op, &doTxOptions)
135+
return doTx(ctx, c.pool, op, c.config.Trace(), opts...)
112136
}
113137

114138
func deleteSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessionID string) error {
@@ -231,6 +255,7 @@ func createSession(
231255

232256
func New(ctx context.Context, balancer balancer, config *config.Config) (*Client, error) {
233257
client := &Client{
258+
config: config,
234259
grpcClient: Ydb_Query_V1.NewQueryServiceClient(balancer),
235260
}
236261

internal/query/client_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1919
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
2020
"github.com/ydb-platform/ydb-go-sdk/v3/query"
21+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2122
)
2223

2324
func TestCreateSession(t *testing.T) {
@@ -182,7 +183,7 @@ func TestDo(t *testing.T) {
182183
return newTestSession()
183184
}), func(ctx context.Context, s query.Session) error {
184185
return nil
185-
}, &query.DoOptions{})
186+
}, &trace.Query{})
186187
require.NoError(t, err)
187188
})
188189
t.Run("RetryableError", func(t *testing.T) {
@@ -196,7 +197,7 @@ func TestDo(t *testing.T) {
196197
}
197198

198199
return nil
199-
}, &query.DoOptions{})
200+
}, &trace.Query{})
200201
require.NoError(t, err)
201202
require.Equal(t, 10, counter)
202203
})
@@ -217,7 +218,7 @@ func TestDoTx(t *testing.T) {
217218
return newTestSessionWithClient(client)
218219
}), func(ctx context.Context, tx query.TxActor) error {
219220
return nil
220-
}, &query.DoTxOptions{})
221+
}, &trace.Query{})
221222
require.NoError(t, err)
222223
})
223224
t.Run("RetryableError", func(t *testing.T) {
@@ -242,7 +243,7 @@ func TestDoTx(t *testing.T) {
242243
}
243244

244245
return nil
245-
}, &query.DoTxOptions{})
246+
}, &trace.Query{})
246247
require.NoError(t, err)
247248
require.Equal(t, 10, counter)
248249
})

internal/query/config/options.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"time"
55

66
"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
7+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
78
)
89

910
type Option func(*Config)
@@ -15,6 +16,13 @@ func With(config config.Common) Option {
1516
}
1617
}
1718

19+
// WithTrace appends table trace to early defined traces
20+
func WithTrace(trace *trace.Query, opts ...trace.QueryComposeOption) Option {
21+
return func(c *Config) {
22+
c.trace = c.trace.Compose(trace, opts...)
23+
}
24+
}
25+
1826
// WithSizeLimit defines upper bound of pooled sessions.
1927
// If sizeLimit is less than or equal to zero then the
2028
// DefaultPoolMaxSize variable is used as a limit.

internal/query/errors.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
var (
88
ErrNotImplemented = errors.New("not implemented yet")
99
errWrongNextResultSetIndex = errors.New("wrong result set index")
10-
errInterruptedStream = errors.New("interrupted stream")
1110
errClosedResult = errors.New("result closed early")
1211
errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index")
1312
)

internal/query/execute_query_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -631,9 +631,9 @@ func TestExecute(t *testing.T) {
631631
ctx := xtest.Context(t)
632632
ctrl := gomock.NewController(t)
633633
stream := NewMockQueryService_ExecuteQueryClient(ctrl)
634-
stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{
635-
Status: Ydb.StatusIds_UNAVAILABLE,
636-
}, nil)
634+
stream.EXPECT().Recv().Return(nil, xerrors.Operation(xerrors.WithStatusCode(
635+
Ydb.StatusIds_UNAVAILABLE,
636+
)))
637637
service := NewMockQueryServiceClient(ctrl)
638638
service.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil)
639639
t.Log("execute")
@@ -707,9 +707,9 @@ func TestExecute(t *testing.T) {
707707
},
708708
},
709709
}, nil)
710-
stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{
711-
Status: Ydb.StatusIds_UNAVAILABLE,
712-
}, nil)
710+
stream.EXPECT().Recv().Return(nil, xerrors.Operation(xerrors.WithStatusCode(
711+
Ydb.StatusIds_UNAVAILABLE,
712+
)))
713713
service := NewMockQueryServiceClient(ctrl)
714714
service.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil)
715715
t.Log("execute")

0 commit comments

Comments
 (0)