Skip to content

Commit a429d38

Browse files
authored
Merge pull request #1536 from ydb-platform/add-metadate-middleware
Added balancer with metadata customization
2 parents 17a901b + cb2dd1a commit a429d38

File tree

5 files changed

+52
-7
lines changed

5 files changed

+52
-7
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Wrapped internal balancer with metadata middleware
2+
13
## v3.89.2
24
* Returned log.XXX methods for create fields, removed from public at v3.85.0
35

balancer.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package ydb
2+
3+
import (
4+
"context"
5+
6+
"google.golang.org/grpc"
7+
8+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer"
9+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
10+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
11+
)
12+
13+
type balancerWithMeta struct {
14+
balancer *balancer.Balancer
15+
meta *meta.Meta
16+
}
17+
18+
func newBalancerWithMeta(b *balancer.Balancer, m *meta.Meta) *balancerWithMeta {
19+
return &balancerWithMeta{balancer: b, meta: m}
20+
}
21+
22+
func (b *balancerWithMeta) Invoke(ctx context.Context, method string, args any, reply any,
23+
opts ...grpc.CallOption,
24+
) error {
25+
metaCtx, err := b.meta.Context(ctx)
26+
if err != nil {
27+
return xerrors.WithStackTrace(err)
28+
}
29+
30+
return b.balancer.Invoke(metaCtx, method, args, reply, opts...)
31+
}
32+
33+
func (b *balancerWithMeta) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string,
34+
opts ...grpc.CallOption,
35+
) (grpc.ClientStream, error) {
36+
metaCtx, err := b.meta.Context(ctx)
37+
if err != nil {
38+
return nil, xerrors.WithStackTrace(err)
39+
}
40+
41+
return b.balancer.NewStream(metaCtx, desc, method, opts...)
42+
}
43+
44+
func (b *balancerWithMeta) Close(ctx context.Context) error {
45+
return b.balancer.Close(ctx)
46+
}

driver.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ type Driver struct {
9696
pool *conn.Pool
9797

9898
mtx sync.Mutex
99-
balancer *balancer.Balancer
99+
balancer *balancerWithMeta
100100

101101
children map[uint64]*Driver
102102
childrenMtx xsync.Mutex
@@ -416,10 +416,11 @@ func (d *Driver) connect(ctx context.Context) (err error) {
416416
d.pool = conn.NewPool(ctx, d.config)
417417
}
418418
if d.balancer == nil {
419-
d.balancer, err = balancer.New(ctx, d.config, d.pool, d.discoveryOptions...)
419+
b, err := balancer.New(ctx, d.config, d.pool, d.discoveryOptions...)
420420
if err != nil {
421421
return xerrors.WithStackTrace(err)
422422
}
423+
d.balancer = newBalancerWithMeta(b, d.config.Meta())
423424
}
424425

425426
d.table = xsync.OnceValue(func() (*internalTable.Client, error) {

internal/balancer/balancer.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -299,10 +299,6 @@ func (b *Balancer) wrapCall(ctx context.Context, f func(ctx context.Context, cc
299299
}
300300
}()
301301

302-
if ctx, err = b.driverConfig.Meta().Context(ctx); err != nil {
303-
return xerrors.WithStackTrace(err)
304-
}
305-
306302
if err = f(ctx, cc); err != nil {
307303
if conn.UseWrapping(ctx) {
308304
if credentials.IsAccessError(err) {

options.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,7 @@ func WithPanicCallback(panicCallback func(e interface{})) Option {
602602
// WithSharedBalancer sets balancer from parent driver to child driver
603603
func WithSharedBalancer(parent *Driver) Option {
604604
return func(ctx context.Context, c *Driver) error {
605-
c.balancer = parent.balancer
605+
c.balancer = newBalancerWithMeta(parent.balancer.balancer, c.config.Meta())
606606

607607
return nil
608608
}

0 commit comments

Comments
 (0)