Skip to content

Commit d485a07

Browse files
committed
balancer config name
1 parent 4e5ae05 commit d485a07

File tree

10 files changed

+162
-75
lines changed

10 files changed

+162
-75
lines changed

balancers/balancers.go

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package balancers
22

33
import (
4+
"sort"
45
"strings"
56

67
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
78
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
9+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xstring"
810
)
911

1012
// Deprecated: RoundRobin is RandomChoice now
@@ -22,14 +24,22 @@ func SingleConn() *balancerConfig.Config {
2224
}
2325
}
2426

27+
type filterLocalDC struct{}
28+
29+
func (filterLocalDC) Allow(info balancerConfig.Info, c conn.Conn) bool {
30+
return c.Endpoint().Location() == info.SelfLocation
31+
}
32+
33+
func (filterLocalDC) String() string {
34+
return "LocalDC"
35+
}
36+
2537
// PreferLocalDC creates balancer which use endpoints only in location such as initial endpoint location
2638
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
2739
// PreferLocalDC balancer try to autodetect local DC from client side.
2840
func PreferLocalDC(balancer *balancerConfig.Config) *balancerConfig.Config {
29-
balancer.IsPreferConn = func(info balancerConfig.Info, c conn.Conn) bool {
30-
return c.Endpoint().Location() == info.SelfLocation
31-
}
32-
balancer.DetectlocalDC = true
41+
balancer.Filter = filterLocalDC{}
42+
balancer.DetectLocalDC = true
3343
return balancer
3444
}
3545

@@ -38,10 +48,38 @@ func PreferLocalDC(balancer *balancerConfig.Config) *balancerConfig.Config {
3848
// If filter returned zero endpoints from all discovery endpoints list - used all endpoint instead
3949
func PreferLocalDCWithFallBack(balancer *balancerConfig.Config) *balancerConfig.Config {
4050
balancer = PreferLocalDC(balancer)
41-
balancer.AllowFalback = true
51+
balancer.AllowFallback = true
4252
return balancer
4353
}
4454

55+
type filterLocations []string
56+
57+
func (locations filterLocations) Allow(_ balancerConfig.Info, c conn.Conn) bool {
58+
location := strings.ToUpper(c.Endpoint().Location())
59+
for _, l := range locations {
60+
if location == l {
61+
return true
62+
}
63+
}
64+
return false
65+
}
66+
67+
func (locations filterLocations) String() string {
68+
buffer := xstring.Buffer()
69+
defer buffer.Free()
70+
71+
buffer.WriteString("Locations{")
72+
for i, l := range locations {
73+
if i != 0 {
74+
buffer.WriteByte(',')
75+
}
76+
buffer.WriteString(l)
77+
}
78+
buffer.WriteByte('}')
79+
80+
return buffer.String()
81+
}
82+
4583
// PreferLocations creates balancer which use endpoints only in selected locations (such as "ABC", "DEF", etc.)
4684
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
4785
func PreferLocations(balancer *balancerConfig.Config, locations ...string) *balancerConfig.Config {
@@ -51,15 +89,8 @@ func PreferLocations(balancer *balancerConfig.Config, locations ...string) *bala
5189
for i := range locations {
5290
locations[i] = strings.ToUpper(locations[i])
5391
}
54-
balancer.IsPreferConn = func(_ balancerConfig.Info, c conn.Conn) bool {
55-
location := strings.ToUpper(c.Endpoint().Location())
56-
for _, l := range locations {
57-
if location == l {
58-
return true
59-
}
60-
}
61-
return false
62-
}
92+
sort.Strings(locations)
93+
balancer.Filter = filterLocations(locations)
6394
return balancer
6495
}
6596

@@ -68,7 +99,7 @@ func PreferLocations(balancer *balancerConfig.Config, locations ...string) *bala
6899
// If filter returned zero endpoints from all discovery endpoints list - used all endpoint instead
69100
func PreferLocationsWithFallback(balancer *balancerConfig.Config, locations ...string) *balancerConfig.Config {
70101
balancer = PreferLocations(balancer, locations...)
71-
balancer.AllowFalback = true
102+
balancer.AllowFallback = true
72103
return balancer
73104
}
74105

@@ -82,12 +113,22 @@ type Endpoint interface {
82113
LocalDC() bool
83114
}
84115

116+
type filterFunc func(info balancerConfig.Info, c conn.Conn) bool
117+
118+
func (p filterFunc) Allow(info balancerConfig.Info, c conn.Conn) bool {
119+
return p(info, c)
120+
}
121+
122+
func (p filterFunc) String() string {
123+
return "Custom"
124+
}
125+
85126
// Prefer creates balancer which use endpoints by filter
86127
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter
87128
func Prefer(balancer *balancerConfig.Config, filter func(endpoint Endpoint) bool) *balancerConfig.Config {
88-
balancer.IsPreferConn = func(_ balancerConfig.Info, c conn.Conn) bool {
129+
balancer.Filter = filterFunc(func(_ balancerConfig.Info, c conn.Conn) bool {
89130
return filter(c.Endpoint())
90-
}
131+
})
91132
return balancer
92133
}
93134

@@ -96,7 +137,7 @@ func Prefer(balancer *balancerConfig.Config, filter func(endpoint Endpoint) bool
96137
// If filter returned zero endpoints from all discovery endpoints list - used all endpoint instead
97138
func PreferWithFallback(balancer *balancerConfig.Config, filter func(endpoint Endpoint) bool) *balancerConfig.Config {
98139
balancer = Prefer(balancer, filter)
99-
balancer.AllowFalback = true
140+
balancer.AllowFallback = true
100141
return balancer
101142
}
102143

balancers/balancers_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func TestPreferLocalDC(t *testing.T) {
1717
&mock.Conn{AddrField: "3", State: conn.Online, LocationField: "2"},
1818
}
1919
rr := PreferLocalDC(RandomChoice())
20-
require.False(t, rr.AllowFalback)
20+
require.False(t, rr.AllowFallback)
2121
require.Equal(t, []conn.Conn{conns[1], conns[2]}, applyPreferFilter(balancerConfig.Info{SelfLocation: "2"}, rr, conns))
2222
}
2323

@@ -28,7 +28,7 @@ func TestPreferLocalDCWithFallBack(t *testing.T) {
2828
&mock.Conn{AddrField: "3", State: conn.Online, LocationField: "2"},
2929
}
3030
rr := PreferLocalDCWithFallBack(RandomChoice())
31-
require.True(t, rr.AllowFalback)
31+
require.True(t, rr.AllowFallback)
3232
require.Equal(t, []conn.Conn{conns[1], conns[2]}, applyPreferFilter(balancerConfig.Info{SelfLocation: "2"}, rr, conns))
3333
}
3434

@@ -40,7 +40,7 @@ func TestPreferLocations(t *testing.T) {
4040
}
4141

4242
rr := PreferLocations(RandomChoice(), "zero", "two")
43-
require.False(t, rr.AllowFalback)
43+
require.False(t, rr.AllowFallback)
4444
require.Equal(t, []conn.Conn{conns[0], conns[2]}, applyPreferFilter(balancerConfig.Info{}, rr, conns))
4545
}
4646

@@ -52,17 +52,17 @@ func TestPreferLocationsWithFallback(t *testing.T) {
5252
}
5353

5454
rr := PreferLocationsWithFallback(RandomChoice(), "zero", "two")
55-
require.True(t, rr.AllowFalback)
55+
require.True(t, rr.AllowFallback)
5656
require.Equal(t, []conn.Conn{conns[0], conns[2]}, applyPreferFilter(balancerConfig.Info{}, rr, conns))
5757
}
5858

5959
func applyPreferFilter(info balancerConfig.Info, b *balancerConfig.Config, conns []conn.Conn) []conn.Conn {
60-
if b.IsPreferConn == nil {
61-
b.IsPreferConn = func(info balancerConfig.Info, c conn.Conn) bool { return true }
60+
if b.Filter == nil {
61+
b.Filter = filterFunc(func(info balancerConfig.Info, c conn.Conn) bool { return true })
6262
}
6363
res := make([]conn.Conn, 0, len(conns))
6464
for _, c := range conns {
65-
if b.IsPreferConn(info, c) {
65+
if b.Filter.Allow(info, c) {
6666
res = append(res, c)
6767
}
6868
}

balancers/config.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ const (
2020
type preferType string
2121

2222
const (
23-
preferLocalDC = preferType("local_dc")
24-
preferLocations = preferType("locations")
23+
preferTypeLocalDC = preferType("local_dc")
24+
preferTypeLocations = preferType("locations")
2525
)
2626

2727
type balancersConfig struct {
@@ -88,12 +88,12 @@ func CreateFromConfig(s string) (*balancerConfig.Config, error) {
8888
}
8989

9090
switch c.Prefer {
91-
case preferLocalDC:
91+
case preferTypeLocalDC:
9292
if c.Fallback {
9393
return PreferLocalDCWithFallBack(b), nil
9494
}
9595
return PreferLocalDC(b), nil
96-
case preferLocations:
96+
case preferTypeLocations:
9797
if len(c.Locations) == 0 {
9898
return nil, xerrors.WithStackTrace(fmt.Errorf("empty locations list in balancer '%s' config", c.Type))
9999
}

balancers/config_test.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,11 @@ func TestFromConfig(t *testing.T) {
7070
"prefer": "local_dc"
7171
}`,
7272
res: balancerConfig.Config{
73-
DetectlocalDC: true,
74-
IsPreferConn: func(info balancerConfig.Info, c conn.Conn) bool {
73+
DetectLocalDC: true,
74+
Filter: filterFunc(func(info balancerConfig.Info, c conn.Conn) bool {
7575
// some non nil func
7676
return false
77-
},
77+
}),
7878
},
7979
},
8080
{
@@ -93,12 +93,12 @@ func TestFromConfig(t *testing.T) {
9393
"fallback": true
9494
}`,
9595
res: balancerConfig.Config{
96-
AllowFalback: true,
97-
DetectlocalDC: true,
98-
IsPreferConn: func(info balancerConfig.Info, c conn.Conn) bool {
96+
AllowFallback: true,
97+
DetectLocalDC: true,
98+
Filter: filterFunc(func(info balancerConfig.Info, c conn.Conn) bool {
9999
// some non nil func
100100
return false
101-
},
101+
}),
102102
},
103103
},
104104
{
@@ -109,10 +109,10 @@ func TestFromConfig(t *testing.T) {
109109
"locations": ["AAA", "BBB", "CCC"]
110110
}`,
111111
res: balancerConfig.Config{
112-
IsPreferConn: func(info balancerConfig.Info, c conn.Conn) bool {
112+
Filter: filterFunc(func(info balancerConfig.Info, c conn.Conn) bool {
113113
// some non nil func
114114
return false
115-
},
115+
}),
116116
},
117117
},
118118
{
@@ -124,11 +124,11 @@ func TestFromConfig(t *testing.T) {
124124
"fallback": true
125125
}`,
126126
res: balancerConfig.Config{
127-
AllowFalback: true,
128-
IsPreferConn: func(info balancerConfig.Info, c conn.Conn) bool {
127+
AllowFallback: true,
128+
Filter: filterFunc(func(info balancerConfig.Info, c conn.Conn) bool {
129129
// some non nil func
130130
return false
131-
},
131+
}),
132132
},
133133
},
134134
} {
@@ -155,10 +155,10 @@ func TestFromConfig(t *testing.T) {
155155
}
156156

157157
// function pointers can check equal to nil only
158-
if tt.res.IsPreferConn != nil {
159-
require.NotNil(t, b.IsPreferConn)
160-
b.IsPreferConn = nil
161-
tt.res.IsPreferConn = nil
158+
if tt.res.Filter != nil {
159+
require.NotNil(t, b.Filter)
160+
b.Filter = nil
161+
tt.res.Filter = nil
162162
}
163163

164164
require.Equal(t, &tt.res, b)

internal/balancer/balancer.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
114114
return xerrors.WithStackTrace(err)
115115
}
116116

117-
if b.balancerConfig.DetectlocalDC {
117+
if b.balancerConfig.DetectLocalDC {
118118
localDC, err = b.localDCDetector(ctx, endpoints)
119119
if err != nil {
120120
return xerrors.WithStackTrace(err)
@@ -128,7 +128,7 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
128128

129129
func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []endpoint.Endpoint, localDC string) {
130130
onDone := trace.DriverOnBalancerUpdate(
131-
b.driverConfig.Trace(), &ctx, stack.FunctionID(0), b.balancerConfig.DetectlocalDC,
131+
b.driverConfig.Trace(), &ctx, stack.FunctionID(0), b.balancerConfig.DetectLocalDC,
132132
)
133133
defer func() {
134134
nodes := make([]trace.EndpointInfo, 0, len(endpoints))
@@ -145,7 +145,7 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end
145145
}
146146

147147
info := balancerConfig.Info{SelfLocation: localDC}
148-
state := newConnectionsState(connections, b.balancerConfig.IsPreferConn, info, b.balancerConfig.AllowFalback)
148+
state := newConnectionsState(connections, b.balancerConfig.Filter, info, b.balancerConfig.AllowFallback)
149149

150150
endpointsInfo := make([]endpoint.Info, len(endpoints))
151151
for i, e := range endpoints {
@@ -187,7 +187,7 @@ func New(
187187
) (b *Balancer, finalErr error) {
188188
var (
189189
onDone = trace.DriverOnBalancerInit(
190-
driverConfig.Trace(), &ctx, stack.FunctionID(0),
190+
driverConfig.Trace(), &ctx, stack.FunctionID(0), driverConfig.Balancer().String(),
191191
)
192192
discoveryConfig = discoveryConfig.New(append(opts,
193193
discoveryConfig.With(driverConfig.Common),
Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,52 @@
11
package config
22

3-
import "github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
3+
import (
4+
"fmt"
5+
6+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
7+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xstring"
8+
)
49

510
// Dedicated package need for prevent cyclo dependencies config -> balancer -> config
611

712
type Config struct {
8-
IsPreferConn PreferConnFunc
9-
AllowFalback bool
13+
Filter Filter
14+
AllowFallback bool
1015
SingleConn bool
11-
DetectlocalDC bool
16+
DetectLocalDC bool
17+
}
18+
19+
func (c Config) String() string {
20+
if c.SingleConn {
21+
return "SingleConn"
22+
}
23+
24+
buffer := xstring.Buffer()
25+
defer buffer.Free()
26+
27+
buffer.WriteString("RandomChoice{")
28+
29+
buffer.WriteString("DetectLocalDC=")
30+
fmt.Fprintf(buffer, "%t", c.DetectLocalDC)
31+
32+
buffer.WriteString(",AllowFallback=")
33+
fmt.Fprintf(buffer, "%t", c.AllowFallback)
34+
35+
if c.Filter != nil {
36+
buffer.WriteString(",Filter=")
37+
fmt.Fprint(buffer, c.Filter.String())
38+
}
39+
40+
buffer.WriteByte('}')
41+
42+
return buffer.String()
1243
}
1344

1445
type Info struct {
1546
SelfLocation string
1647
}
1748

18-
type PreferConnFunc func(info Info, c conn.Conn) bool
49+
type Filter interface {
50+
Allow(info Info, c conn.Conn) bool
51+
String() string
52+
}

0 commit comments

Comments
 (0)