Skip to content

Commit 8121837

Browse files
authored
Merge pull request #1347 from ydb-platform/balancers-filter
* Changed signature of filter func in balancers (replaced argument from `conn.Conn` type to `endpoint.Info`)
2 parents f9cff99 + 54a90ad commit 8121837

File tree

7 files changed

+39
-37
lines changed

7 files changed

+39
-37
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
* Fixed return more than one row error if real error raised on try read next row
22
* Fixed checking errors for session must be deleted
3+
* Changed signature of filter func in balancers (replaced argument from `conn.Conn` type to `endpoint.Info`)
34

45
## v3.75.0
56
* Improve config validation before start topic reader

balancers/balancers.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"strings"
66

77
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
8-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
8+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
99
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xslices"
1010
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xstring"
1111
)
@@ -29,8 +29,8 @@ func SingleConn() *balancerConfig.Config {
2929

3030
type filterLocalDC struct{}
3131

32-
func (filterLocalDC) Allow(info balancerConfig.Info, c conn.Conn) bool {
33-
return c.Endpoint().Location() == info.SelfLocation
32+
func (filterLocalDC) Allow(info balancerConfig.Info, e endpoint.Info) bool {
33+
return e.Location() == info.SelfLocation
3434
}
3535

3636
func (filterLocalDC) String() string {
@@ -59,8 +59,8 @@ func PreferLocalDCWithFallBack(balancer *balancerConfig.Config) *balancerConfig.
5959

6060
type filterLocations []string
6161

62-
func (locations filterLocations) Allow(_ balancerConfig.Info, c conn.Conn) bool {
63-
location := strings.ToUpper(c.Endpoint().Location())
62+
func (locations filterLocations) Allow(_ balancerConfig.Info, e endpoint.Info) bool {
63+
location := strings.ToUpper(e.Location())
6464
for _, l := range locations {
6565
if location == l {
6666
return true
@@ -127,10 +127,10 @@ type Endpoint interface {
127127
LocalDC() bool
128128
}
129129

130-
type filterFunc func(info balancerConfig.Info, c conn.Conn) bool
130+
type filterFunc func(info balancerConfig.Info, e endpoint.Info) bool
131131

132-
func (p filterFunc) Allow(info balancerConfig.Info, c conn.Conn) bool {
133-
return p(info, c)
132+
func (p filterFunc) Allow(info balancerConfig.Info, e endpoint.Info) bool {
133+
return p(info, e)
134134
}
135135

136136
func (p filterFunc) String() string {
@@ -140,8 +140,8 @@ func (p filterFunc) String() string {
140140
// Prefer creates balancer which use endpoints by filter
141141
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter
142142
func Prefer(balancer *balancerConfig.Config, filter func(endpoint Endpoint) bool) *balancerConfig.Config {
143-
balancer.Filter = filterFunc(func(_ balancerConfig.Info, c conn.Conn) bool {
144-
return filter(c.Endpoint())
143+
balancer.Filter = filterFunc(func(_ balancerConfig.Info, e endpoint.Info) bool {
144+
return filter(e)
145145
})
146146

147147
return balancer

balancers/balancers_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
99
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
10+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1011
"github.com/ydb-platform/ydb-go-sdk/v3/internal/mock"
1112
)
1213

@@ -58,11 +59,11 @@ func TestPreferLocationsWithFallback(t *testing.T) {
5859

5960
func applyPreferFilter(info balancerConfig.Info, b *balancerConfig.Config, conns []conn.Conn) []conn.Conn {
6061
if b.Filter == nil {
61-
b.Filter = filterFunc(func(info balancerConfig.Info, c conn.Conn) bool { return true })
62+
b.Filter = filterFunc(func(info balancerConfig.Info, e endpoint.Info) bool { return true })
6263
}
6364
res := make([]conn.Conn, 0, len(conns))
6465
for _, c := range conns {
65-
if b.Filter.Allow(info, c) {
66+
if b.Filter.Allow(info, c.Endpoint()) {
6667
res = append(res, c)
6768
}
6869
}

balancers/config_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"github.com/stretchr/testify/require"
77

88
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
9-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
9+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1010
)
1111

1212
func TestFromConfig(t *testing.T) {
@@ -71,7 +71,7 @@ func TestFromConfig(t *testing.T) {
7171
}`,
7272
res: balancerConfig.Config{
7373
DetectLocalDC: true,
74-
Filter: filterFunc(func(info balancerConfig.Info, c conn.Conn) bool {
74+
Filter: filterFunc(func(info balancerConfig.Info, e endpoint.Info) bool {
7575
// some non nil func
7676
return false
7777
}),
@@ -95,7 +95,7 @@ func TestFromConfig(t *testing.T) {
9595
res: balancerConfig.Config{
9696
AllowFallback: true,
9797
DetectLocalDC: true,
98-
Filter: filterFunc(func(info balancerConfig.Info, c conn.Conn) bool {
98+
Filter: filterFunc(func(info balancerConfig.Info, e endpoint.Info) bool {
9999
// some non nil func
100100
return false
101101
}),
@@ -109,7 +109,7 @@ func TestFromConfig(t *testing.T) {
109109
"locations": ["AAA", "BBB", "CCC"]
110110
}`,
111111
res: balancerConfig.Config{
112-
Filter: filterFunc(func(info balancerConfig.Info, c conn.Conn) bool {
112+
Filter: filterFunc(func(info balancerConfig.Info, e endpoint.Info) bool {
113113
// some non nil func
114114
return false
115115
}),
@@ -125,7 +125,7 @@ func TestFromConfig(t *testing.T) {
125125
}`,
126126
res: balancerConfig.Config{
127127
AllowFallback: true,
128-
Filter: filterFunc(func(info balancerConfig.Info, c conn.Conn) bool {
128+
Filter: filterFunc(func(info balancerConfig.Info, e endpoint.Info) bool {
129129
// some non nil func
130130
return false
131131
}),

internal/balancer/config/routerconfig.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package config
33
import (
44
"fmt"
55

6-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
6+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
77
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xstring"
88
)
99

@@ -47,6 +47,6 @@ type Info struct {
4747
}
4848

4949
type Filter interface {
50-
Allow(info Info, c conn.Conn) bool
50+
Allow(info Info, e endpoint.Info) bool
5151
String() string
5252
}

internal/balancer/connections_state.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func sortPreferConnections(
157157
}
158158

159159
for _, c := range conns {
160-
if filter.Allow(info, c) {
160+
if filter.Allow(info, c.Endpoint()) {
161161
prefer = append(prefer, c)
162162
} else if allowFallback {
163163
fallback = append(fallback, c)

internal/balancer/connections_state_test.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@ func TestConnsToNodeIDMap(t *testing.T) {
6666
}
6767
}
6868

69-
type filterFunc func(info balancerConfig.Info, c conn.Conn) bool
69+
type filterFunc func(info balancerConfig.Info, e endpoint.Info) bool
7070

71-
func (f filterFunc) Allow(info balancerConfig.Info, c conn.Conn) bool {
72-
return f(info, c)
71+
func (f filterFunc) Allow(info balancerConfig.Info, e endpoint.Info) bool {
72+
return f(info, e)
7373
}
7474

7575
func (f filterFunc) String() string {
@@ -116,8 +116,8 @@ func TestSortPreferConnections(t *testing.T) {
116116
&mock.Conn{AddrField: "f2"},
117117
},
118118
allowFallback: false,
119-
filter: filterFunc(func(_ balancerConfig.Info, c conn.Conn) bool {
120-
return strings.HasPrefix(c.Endpoint().Address(), "t")
119+
filter: filterFunc(func(_ balancerConfig.Info, e endpoint.Info) bool {
120+
return strings.HasPrefix(e.Address(), "t")
121121
}),
122122
prefer: []conn.Conn{
123123
&mock.Conn{AddrField: "t1"},
@@ -134,8 +134,8 @@ func TestSortPreferConnections(t *testing.T) {
134134
&mock.Conn{AddrField: "f2"},
135135
},
136136
allowFallback: true,
137-
filter: filterFunc(func(_ balancerConfig.Info, c conn.Conn) bool {
138-
return strings.HasPrefix(c.Endpoint().Address(), "t")
137+
filter: filterFunc(func(_ balancerConfig.Info, e endpoint.Info) bool {
138+
return strings.HasPrefix(e.Address(), "t")
139139
}),
140140
prefer: []conn.Conn{
141141
&mock.Conn{AddrField: "t1"},
@@ -287,8 +287,8 @@ func TestNewState(t *testing.T) {
287287
&mock.Conn{AddrField: "f1", NodeIDField: 2, LocationField: "f"},
288288
&mock.Conn{AddrField: "t2", NodeIDField: 3, LocationField: "t"},
289289
&mock.Conn{AddrField: "f2", NodeIDField: 4, LocationField: "f"},
290-
}, filterFunc(func(info balancerConfig.Info, c conn.Conn) bool {
291-
return info.SelfLocation == c.Endpoint().Location()
290+
}, filterFunc(func(info balancerConfig.Info, e endpoint.Info) bool {
291+
return info.SelfLocation == e.Location()
292292
}), balancerConfig.Info{SelfLocation: "t"}, false),
293293
res: &connectionsState{
294294
connByNodeID: map[uint32]conn.Conn{
@@ -315,8 +315,8 @@ func TestNewState(t *testing.T) {
315315
&mock.Conn{AddrField: "f1", NodeIDField: 2, LocationField: "f"},
316316
&mock.Conn{AddrField: "t2", NodeIDField: 3, LocationField: "t"},
317317
&mock.Conn{AddrField: "f2", NodeIDField: 4, LocationField: "f"},
318-
}, filterFunc(func(info balancerConfig.Info, c conn.Conn) bool {
319-
return info.SelfLocation == c.Endpoint().Location()
318+
}, filterFunc(func(info balancerConfig.Info, e endpoint.Info) bool {
319+
return info.SelfLocation == e.Location()
320320
}), balancerConfig.Info{SelfLocation: "t"}, true),
321321
res: &connectionsState{
322322
connByNodeID: map[uint32]conn.Conn{
@@ -348,8 +348,8 @@ func TestNewState(t *testing.T) {
348348
&mock.Conn{AddrField: "f1", NodeIDField: 2, LocationField: "f"},
349349
&mock.Conn{AddrField: "t2", NodeIDField: 3, LocationField: "t"},
350350
&mock.Conn{AddrField: "f2", NodeIDField: 4, LocationField: "f"},
351-
}, filterFunc(func(info balancerConfig.Info, c conn.Conn) bool {
352-
return info.SelfLocation == c.Endpoint().Location()
351+
}, filterFunc(func(info balancerConfig.Info, e endpoint.Info) bool {
352+
return info.SelfLocation == e.Location()
353353
}), balancerConfig.Info{SelfLocation: "t"}, true),
354354
res: &connectionsState{
355355
connByNodeID: map[uint32]conn.Conn{
@@ -413,8 +413,8 @@ func TestConnection(t *testing.T) {
413413
s := newConnectionsState([]conn.Conn{
414414
&mock.Conn{AddrField: "t1", State: conn.Banned, LocationField: "t"},
415415
&mock.Conn{AddrField: "f2", State: conn.Banned, LocationField: "f"},
416-
}, filterFunc(func(info balancerConfig.Info, c conn.Conn) bool {
417-
return c.Endpoint().Location() == info.SelfLocation
416+
}, filterFunc(func(info balancerConfig.Info, e endpoint.Info) bool {
417+
return e.Location() == info.SelfLocation
418418
}), balancerConfig.Info{}, true)
419419
preferred := 0
420420
fallback := 0
@@ -436,8 +436,8 @@ func TestConnection(t *testing.T) {
436436
s := newConnectionsState([]conn.Conn{
437437
&mock.Conn{AddrField: "t1", State: conn.Banned, LocationField: "t"},
438438
&mock.Conn{AddrField: "f2", State: conn.Online, LocationField: "f"},
439-
}, filterFunc(func(info balancerConfig.Info, c conn.Conn) bool {
440-
return c.Endpoint().Location() == info.SelfLocation
439+
}, filterFunc(func(info balancerConfig.Info, e endpoint.Info) bool {
440+
return e.Location() == info.SelfLocation
441441
}), balancerConfig.Info{SelfLocation: "t"}, true)
442442
c, failed := s.GetConnection(context.Background())
443443
require.Equal(t, &mock.Conn{AddrField: "f2", State: conn.Online, LocationField: "f"}, c)

0 commit comments

Comments
 (0)