Skip to content

Commit ab001f1

Browse files
committed
* Enabled server-side session balancing for sessions created from internal session pool
* Removed unused public `meta.Meta` methods * Renamed `meta.Meta.Meta(ctx)` public method to `meta.Meta.Context(ctx)` * Rollbacked default balancer to `balancers.RandomChoice()`
1 parent d1bff64 commit ab001f1

File tree

13 files changed

+136
-89
lines changed

13 files changed

+136
-89
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
* Enabled server-side session balancing for sessions created from internal session pool
2+
* Removed unused public `meta.Meta` methods
3+
* Renamed `meta.Meta.Meta(ctx)` public method to `meta.Meta.Context(ctx)`
4+
* Rollbacked default balancer to `balancers.RandomChoice()`
5+
16
## v3.29.1
27
* Changed default balancer to `balancers.PreferLocalDC(balancers.RandomChoice())`
38

balancers/balancers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,5 +102,5 @@ func PreferWithFallback(balancer *balancerConfig.Config, filter func(endpoint En
102102

103103
// Default balancer used by default
104104
func Default() *balancerConfig.Config {
105-
return PreferLocalDC(RandomChoice())
105+
return RandomChoice()
106106
}

balancers/balancers_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ func TestPreferLocalDC(t *testing.T) {
1616
&mock.Conn{AddrField: "2", State: conn.Online, LocationField: "2"},
1717
&mock.Conn{AddrField: "3", State: conn.Online, LocationField: "2"},
1818
}
19-
rr := PreferLocalDC(Default())
19+
rr := PreferLocalDC(RandomChoice())
2020
require.False(t, rr.AllowFalback)
2121
require.Equal(t, []conn.Conn{conns[1], conns[2]}, applyPreferFilter(balancerConfig.Info{SelfLocation: "2"}, rr, conns))
2222
}
@@ -27,7 +27,7 @@ func TestPreferLocalDCWithFallBack(t *testing.T) {
2727
&mock.Conn{AddrField: "2", State: conn.Online, LocationField: "2"},
2828
&mock.Conn{AddrField: "3", State: conn.Online, LocationField: "2"},
2929
}
30-
rr := PreferLocalDCWithFallBack(Default())
30+
rr := PreferLocalDCWithFallBack(RandomChoice())
3131
require.True(t, rr.AllowFalback)
3232
require.Equal(t, []conn.Conn{conns[1], conns[2]}, applyPreferFilter(balancerConfig.Info{SelfLocation: "2"}, rr, conns))
3333
}
@@ -39,7 +39,7 @@ func TestPreferLocations(t *testing.T) {
3939
&mock.Conn{AddrField: "3", State: conn.Online, LocationField: "two"},
4040
}
4141

42-
rr := PreferLocations(Default(), "zero", "two")
42+
rr := PreferLocations(RandomChoice(), "zero", "two")
4343
require.False(t, rr.AllowFalback)
4444
require.Equal(t, []conn.Conn{conns[0], conns[2]}, applyPreferFilter(balancerConfig.Info{}, rr, conns))
4545
}
@@ -51,7 +51,7 @@ func TestPreferLocationsWithFallback(t *testing.T) {
5151
&mock.Conn{AddrField: "3", State: conn.Online, LocationField: "two"},
5252
}
5353

54-
rr := PreferLocationsWithFallback(Default(), "zero", "two")
54+
rr := PreferLocationsWithFallback(RandomChoice(), "zero", "two")
5555
require.True(t, rr.AllowFalback)
5656
require.Equal(t, []conn.Conn{conns[0], conns[2]}, applyPreferFilter(balancerConfig.Info{}, rr, conns))
5757
}

config/config.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ type Config struct {
3232
secure bool
3333
endpoint string
3434
database string
35-
requestsType string
36-
userAgent string
35+
metaOptions []meta.Option
3736
grpcOptions []grpc.DialOption
3837
credentials credentials.Credentials
3938
tlsConfig *tls.Config
@@ -109,12 +108,6 @@ func (c Config) Balancer() *balancerConfig.Config {
109108
return c.balancerConfig
110109
}
111110

112-
// RequestsType set an additional type hint to all requests.
113-
// It is needed only for debug purposes and advanced cases.
114-
func (c Config) RequestsType() string {
115-
return c.requestsType
116-
}
117-
118111
type Option func(c *Config)
119112

120113
// WithInternalDNSResolver
@@ -169,7 +162,7 @@ func WithTrace(t trace.Driver, opts ...trace.DriverComposeOption) Option {
169162

170163
func WithUserAgent(userAgent string) Option {
171164
return func(c *Config) {
172-
c.userAgent = userAgent
165+
c.metaOptions = append(c.metaOptions, meta.WithUserAgentOption(userAgent))
173166
}
174167
}
175168

@@ -237,7 +230,7 @@ func WithBalancer(balancer *balancerConfig.Config) Option {
237230

238231
func WithRequestsType(requestsType string) Option {
239232
return func(c *Config) {
240-
c.requestsType = requestsType
233+
c.metaOptions = append(c.metaOptions, meta.WithRequestTypeOption(requestsType))
241234
}
242235
}
243236

@@ -287,8 +280,7 @@ func New(opts ...Option) Config {
287280
c.database,
288281
c.credentials,
289282
c.trace,
290-
c.requestsType,
291-
c.userAgent,
283+
c.metaOptions...,
292284
)
293285
return c
294286
}

internal/balancer/balancer.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2020
)
2121

22-
var ErrClusterEmpty = xerrors.Wrap(fmt.Errorf("cluster empty"))
22+
var ErrNoEndpoints = xerrors.Wrap(fmt.Errorf("no endpoints"))
2323

2424
type balancer struct {
2525
driverConfig config.Config
@@ -245,7 +245,7 @@ func (b *balancer) wrapCall(ctx context.Context, f func(ctx context.Context, cc
245245
}
246246
}()
247247

248-
if ctx, err = b.driverConfig.Meta().Meta(ctx); err != nil {
248+
if ctx, err = b.driverConfig.Meta().Context(ctx); err != nil {
249249
return xerrors.WithStackTrace(err)
250250
}
251251

@@ -289,7 +289,9 @@ func (b *balancer) getConn(ctx context.Context) (c conn.Conn, err error) {
289289

290290
c, failedCount = state.GetConnection(ctx)
291291
if c == nil {
292-
return nil, xerrors.WithStackTrace(ErrClusterEmpty)
292+
return nil, xerrors.WithStackTrace(
293+
fmt.Errorf("%w: cannot get connection from balancer after %d attempts", ErrNoEndpoints, failedCount),
294+
)
293295
}
294296
return c, nil
295297
}

internal/balancer/local_dc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func detectFastestEndpoint(ctx context.Context, endpoints []endpoint.Endpoint) (
109109

110110
func detectLocalDC(ctx context.Context, endpoints []endpoint.Endpoint) (string, error) {
111111
if len(endpoints) == 0 {
112-
return "", xerrors.WithStackTrace(ErrClusterEmpty)
112+
return "", xerrors.WithStackTrace(ErrNoEndpoints)
113113
}
114114
endpointsByDc := splitEndpointsByLocation(endpoints)
115115

internal/discovery/discovery.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func (c *Client) Discover(ctx context.Context) (endpoints []endpoint.Endpoint, e
6262
onDone(location, nodes, err)
6363
}()
6464

65-
ctx, err = c.config.Meta().Meta(ctx)
65+
ctx, err = c.config.Meta().Context(ctx)
6666
if err != nil {
6767
return nil, xerrors.WithStackTrace(err)
6868
}
@@ -110,7 +110,7 @@ func (c *Client) WhoAmI(ctx context.Context) (whoAmI *discovery.WhoAmI, err erro
110110
}
111111
}()
112112

113-
ctx, err = c.config.Meta().Meta(ctx)
113+
ctx, err = c.config.Meta().Context(ctx)
114114
if err != nil {
115115
return nil, xerrors.WithStackTrace(err)
116116
}

internal/meta/context.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,12 @@ func WithUserAgent(ctx context.Context, userAgent string) context.Context {
2020
func WithRequestType(ctx context.Context, requestType string) context.Context {
2121
return metadata.AppendToOutgoingContext(ctx, HeaderRequestType, requestType)
2222
}
23+
24+
// WithAllowFeatures returns a copy of parent context with allowed client feature
25+
func WithAllowFeatures(ctx context.Context, features ...string) context.Context {
26+
kv := make([]string, 0, len(features)*2)
27+
for _, feature := range features {
28+
kv = append(kv, HeaderClientCapabilities, feature)
29+
}
30+
return metadata.AppendToOutgoingContext(ctx, kv...)
31+
}

internal/meta/meta.go

Lines changed: 59 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -13,79 +13,85 @@ import (
1313

1414
const (
1515
// outgoing headers
16-
HeaderDatabase = "x-ydb-database"
17-
HeaderTicket = "x-ydb-auth-ticket"
18-
HeaderVersion = "x-ydb-sdk-build-info"
19-
HeaderRequestType = "x-ydb-request-type"
20-
HeaderTraceID = "x-ydb-trace-id"
21-
HeaderUserAgent = "x-ydb-user-agent"
16+
HeaderDatabase = "x-ydb-database"
17+
HeaderTicket = "x-ydb-auth-ticket"
18+
HeaderVersion = "x-ydb-sdk-build-info"
19+
HeaderRequestType = "x-ydb-request-type"
20+
HeaderTraceID = "x-ydb-trace-id"
21+
HeaderUserAgent = "x-ydb-user-agent"
22+
HeaderClientCapabilities = "x-ydb-client-capabilities"
23+
24+
// outgoing hints
25+
HintSessionBalancer = "session-balancer"
2226

2327
// incomming headers
2428
HeaderServerHints = "x-ydb-server-hints"
2529

26-
// hints
30+
// incoming hints
2731
HintSessionClose = "session-close"
2832
)
2933

3034
type Meta interface {
31-
Meta(ctx context.Context) (context.Context, error)
32-
33-
WithDatabase(database string) Meta
34-
WithCredentials(creds credentials.Credentials) Meta
35-
WithUserAgent(userAgent string) Meta
36-
37-
Database() string
38-
UserAgent() string
35+
Context(ctx context.Context) (context.Context, error)
3936
}
4037

4138
func New(
4239
database string,
4340
credentials credentials.Credentials,
4441
trace trace.Driver,
45-
requestsType string,
46-
userAgent string,
42+
opts ...Option,
4743
) Meta {
48-
return &meta{
49-
trace: trace,
50-
credentials: credentials,
51-
database: database,
52-
requestsType: requestsType,
53-
userAgent: userAgent,
44+
m := &meta{
45+
trace: trace,
46+
credentials: credentials,
47+
database: database,
5448
}
49+
for _, o := range opts {
50+
o(m)
51+
}
52+
return m
5553
}
5654

57-
type meta struct {
58-
trace trace.Driver
59-
credentials credentials.Credentials
60-
database string
61-
requestsType string
62-
userAgent string
63-
}
55+
type Option func(m *meta)
6456

65-
func (m *meta) Database() string {
66-
return m.database
57+
func WithUserAgentOption(userAgent string) Option {
58+
return func(m *meta) {
59+
m.userAgent = userAgent
60+
}
6761
}
6862

69-
func (m *meta) UserAgent() string {
70-
return m.userAgent
63+
func WithRequestTypeOption(requestType string) Option {
64+
return func(m *meta) {
65+
m.requestsType = requestType
66+
}
7167
}
7268

73-
func (m *meta) WithDatabase(database string) Meta {
74-
mm := *m
75-
mm.database = database
76-
return &mm
69+
func AllowOption(feature string) Option {
70+
return func(m *meta) {
71+
m.capabilities = append(m.capabilities, feature)
72+
}
7773
}
7874

79-
func (m *meta) WithCredentials(creds credentials.Credentials) Meta {
80-
mm := *m
81-
mm.credentials = creds
82-
return &mm
75+
func ForbidOption(feature string) Option {
76+
return func(m *meta) {
77+
n := 0
78+
for _, capability := range m.capabilities {
79+
if capability != feature {
80+
m.capabilities[n] = capability
81+
n++
82+
}
83+
}
84+
m.capabilities = m.capabilities[:n]
85+
}
8386
}
8487

85-
func (m *meta) WithUserAgent(userAgent string) Meta {
86-
mm := *m
87-
mm.userAgent = userAgent
88-
return &mm
88+
type meta struct {
89+
trace trace.Driver
90+
credentials credentials.Credentials
91+
database string
92+
requestsType string
93+
userAgent string
94+
capabilities []string
8995
}
9096

9197
func (m *meta) meta(ctx context.Context) (_ metadata.MD, err error) {
@@ -114,15 +120,19 @@ func (m *meta) meta(ctx context.Context) (_ metadata.MD, err error) {
114120
}
115121
}
116122

123+
if len(m.capabilities) > 0 {
124+
md.Append(HeaderClientCapabilities, m.capabilities...)
125+
}
126+
117127
if m.credentials == nil {
118128
return md, nil
119129
}
120130

121131
var token string
122132

123-
getCredentialsDone := trace.DriverOnGetCredentials(m.trace, &ctx)
133+
done := trace.DriverOnGetCredentials(m.trace, &ctx)
124134
defer func() {
125-
getCredentialsDone(token, err)
135+
done(token, err)
126136
}()
127137

128138
token, err = m.credentials.Token(ctx)
@@ -138,7 +148,7 @@ func (m *meta) meta(ctx context.Context) (_ metadata.MD, err error) {
138148
return md, nil
139149
}
140150

141-
func (m *meta) Meta(ctx context.Context) (_ context.Context, err error) {
151+
func (m *meta) Context(ctx context.Context) (_ context.Context, err error) {
142152
md, err := m.meta(ctx)
143153
if err != nil {
144154
return ctx, xerrors.WithStackTrace(err)

internal/meta/test/meta_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ func TestMetaRequiredHeaders(t *testing.T) {
1717
"database",
1818
credentials.NewAccessTokenCredentials("token", "TestMetaRequiredHeaders"),
1919
trace.Driver{},
20-
"requestType",
21-
"user-agent",
20+
meta.WithRequestTypeOption("requestType"),
21+
meta.WithUserAgentOption("user-agent"),
2222
)
2323

2424
ctx := context.Background()
@@ -29,7 +29,7 @@ func TestMetaRequiredHeaders(t *testing.T) {
2929

3030
ctx = metadata.AppendToOutgoingContext(ctx, "some-user-header", "some-user-value")
3131

32-
ctx, err := m.Meta(ctx)
32+
ctx, err := m.Context(ctx)
3333
if err != nil {
3434
t.Fatal(err)
3535
}

0 commit comments

Comments
 (0)