Skip to content

Commit 9200740

Browse files
authored
Merge pull request #1145 from ydb-platform/slo-logs
Performance of QueryService client over SLO + refactoring of traces
2 parents b3b6870 + f9e22e1 commit 9200740

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+903
-1124
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
* Added query pool metrics
44
* Fixed logic of query session pool
55
* Changed initialization of internal driver clients to lazy
6+
* Disabled the logic of background grpc-connection parking
7+
* Removed `ydb.WithSessionPoolSizeLimit()` option
8+
* Added async put session into pool if external context is done
9+
* Dropped intermediate callbacks from `trace.{Table,Retry,Query}` events
10+
* Wrapped errors from `internal/pool.Pool.getItem` as retryable
611
* Disabled the logic of background grpc-connection parking
712
* Improved stringification for postgres types
813

driver.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ func New(ctx context.Context, opts ...Option) (_ *Driver, err error) {
300300

301301
//nolint:cyclop, nonamedreturns
302302
func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, err error) {
303-
ctx, driverCtxCancel := xcontext.WithCancel(xcontext.WithoutDeadline(ctx))
303+
ctx, driverCtxCancel := xcontext.WithCancel(xcontext.ValueOnly(ctx))
304304
defer func() {
305305
if err != nil {
306306
driverCtxCancel()
@@ -401,7 +401,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
401401
}
402402

403403
d.table = xsync.OnceValue(func() *internalTable.Client {
404-
return internalTable.New(xcontext.WithoutDeadline(ctx),
404+
return internalTable.New(xcontext.ValueOnly(ctx),
405405
d.balancer,
406406
tableConfig.New(
407407
append(
@@ -416,7 +416,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
416416
})
417417

418418
d.query = xsync.OnceValue(func() *internalQuery.Client {
419-
return internalQuery.New(xcontext.WithoutDeadline(ctx),
419+
return internalQuery.New(xcontext.ValueOnly(ctx),
420420
d.balancer,
421421
queryConfig.New(
422422
append(
@@ -434,7 +434,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
434434
}
435435

436436
d.scheme = xsync.OnceValue(func() *internalScheme.Client {
437-
return internalScheme.New(xcontext.WithoutDeadline(ctx),
437+
return internalScheme.New(xcontext.ValueOnly(ctx),
438438
d.balancer,
439439
schemeConfig.New(
440440
append(
@@ -450,7 +450,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
450450
})
451451

452452
d.coordination = xsync.OnceValue(func() *internalCoordination.Client {
453-
return internalCoordination.New(xcontext.WithoutDeadline(ctx),
453+
return internalCoordination.New(xcontext.ValueOnly(ctx),
454454
d.balancer,
455455
coordinationConfig.New(
456456
append(
@@ -465,7 +465,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
465465
})
466466

467467
d.ratelimiter = xsync.OnceValue(func() *internalRatelimiter.Client {
468-
return internalRatelimiter.New(xcontext.WithoutDeadline(ctx),
468+
return internalRatelimiter.New(xcontext.ValueOnly(ctx),
469469
d.balancer,
470470
ratelimiterConfig.New(
471471
append(
@@ -480,7 +480,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
480480
})
481481

482482
d.discovery = xsync.OnceValue(func() *internalDiscovery.Client {
483-
return internalDiscovery.New(xcontext.WithoutDeadline(ctx),
483+
return internalDiscovery.New(xcontext.ValueOnly(ctx),
484484
d.pool.Get(endpoint.New(d.config.Endpoint())),
485485
discoveryConfig.New(
486486
append(
@@ -499,7 +499,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
499499
})
500500

501501
d.scripting = xsync.OnceValue(func() *internalScripting.Client {
502-
return internalScripting.New(xcontext.WithoutDeadline(ctx),
502+
return internalScripting.New(xcontext.ValueOnly(ctx),
503503
d.balancer,
504504
scriptingConfig.New(
505505
append(
@@ -514,7 +514,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
514514
})
515515

516516
d.topic = xsync.OnceValue(func() *topicclientinternal.Client {
517-
return topicclientinternal.New(xcontext.WithoutDeadline(ctx),
517+
return topicclientinternal.New(xcontext.ValueOnly(ctx),
518518
d.balancer,
519519
d.config.Credentials(),
520520
append(

internal/balancer/balancer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ func New(
280280
}
281281
// run background discovering
282282
if d := discoveryConfig.Interval(); d > 0 {
283-
b.discoveryRepeater = repeater.New(xcontext.WithoutDeadline(ctx),
283+
b.discoveryRepeater = repeater.New(xcontext.ValueOnly(ctx),
284284
d, b.clusterDiscoveryAttempt,
285285
repeater.WithName("discovery"),
286286
repeater.WithTrace(b.driverConfig.Trace()),

internal/conn/conn.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,10 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
183183
}, c.config.GrpcDialOptions()...,
184184
)...)
185185
if err != nil {
186+
if xerrors.IsContextError(err) {
187+
return nil, xerrors.WithStackTrace(err)
188+
}
189+
186190
defer func() {
187191
c.onTransportError(ctx, err)
188192
}()
@@ -310,6 +314,10 @@ func (c *conn) Invoke(
310314

311315
err = cc.Invoke(ctx, method, req, res, append(opts, grpc.Trailer(&md))...)
312316
if err != nil {
317+
if xerrors.IsContextError(err) {
318+
return xerrors.WithStackTrace(err)
319+
}
320+
313321
defer func() {
314322
c.onTransportError(ctx, err)
315323
}()
@@ -392,6 +400,10 @@ func (c *conn) NewStream(
392400

393401
s, err = cc.NewStream(ctx, desc, method, opts...)
394402
if err != nil {
403+
if xerrors.IsContextError(err) {
404+
return nil, xerrors.WithStackTrace(err)
405+
}
406+
395407
defer func() {
396408
c.onTransportError(ctx, err)
397409
}()

internal/conn/grpc_client_stream.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ func (s *grpcClientStream) CloseSend() (err error) {
3333
err = s.ClientStream.CloseSend()
3434

3535
if err != nil {
36+
if xerrors.IsContextError(err) {
37+
return xerrors.WithStackTrace(err)
38+
}
39+
3640
if s.wrapping {
3741
return s.wrapError(
3842
xerrors.Transport(
@@ -58,6 +62,10 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
5862
err = s.ClientStream.SendMsg(m)
5963

6064
if err != nil {
65+
if xerrors.IsContextError(err) {
66+
return xerrors.WithStackTrace(err)
67+
}
68+
6169
defer func() {
6270
s.c.onTransportError(s.Context(), err)
6371
}()
@@ -97,7 +105,11 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
97105

98106
err = s.ClientStream.RecvMsg(m)
99107

100-
if err != nil {
108+
if err != nil { //nolint:nestif
109+
if xerrors.IsContextError(err) {
110+
return xerrors.WithStackTrace(err)
111+
}
112+
101113
defer func() {
102114
if !xerrors.Is(err, io.EOF) {
103115
s.c.onTransportError(s.Context(), err)

internal/conn/pool.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,24 @@ func (p *Pool) Ban(ctx context.Context, cc Conn, cause error) {
7878
return
7979
}
8080

81-
if xerrors.IsTransportError(cause,
82-
grpcCodes.OK,
83-
grpcCodes.Canceled,
81+
if !xerrors.IsTransportError(cause,
8482
grpcCodes.ResourceExhausted,
85-
grpcCodes.OutOfRange,
83+
grpcCodes.Unavailable,
84+
// grpcCodes.OK,
85+
// grpcCodes.Canceled,
86+
// grpcCodes.Unknown,
87+
// grpcCodes.InvalidArgument,
88+
// grpcCodes.DeadlineExceeded,
89+
// grpcCodes.NotFound,
90+
// grpcCodes.AlreadyExists,
91+
// grpcCodes.PermissionDenied,
92+
// grpcCodes.FailedPrecondition,
93+
// grpcCodes.Aborted,
94+
// grpcCodes.OutOfRange,
95+
// grpcCodes.Unimplemented,
96+
// grpcCodes.Internal,
97+
// grpcCodes.DataLoss,
98+
// grpcCodes.Unauthenticated,
8699
) {
87100
return
88101
}

internal/pool/errors.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,9 @@ package pool
22

33
import (
44
"errors"
5-
6-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
75
)
86

97
var (
108
errClosedPool = errors.New("closed pool")
11-
errPoolOverflow = xerrors.Retryable(errors.New("pool overflow"))
129
errItemIsNotAlive = errors.New("item is not alive")
1310
)

0 commit comments

Comments
 (0)