Skip to content

Commit d25adf4

Browse files
authored
Merge pull request #245 from ydb-platform/234-prefer-local-dc
Detect local DC
2 parents afee622 + e3ec5cb commit d25adf4

File tree

18 files changed

+1141
-289
lines changed

18 files changed

+1141
-289
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
* Implemented detection of local data-center with measuring tcp dial RTT
2+
* Added `trace.Driver.OnBalancer{Init,Close,ChooseEndpoint,Update}` events
3+
* Marked the driver cluster events as deprecated
14
* Simplified the balancing logic
25

36
## v3.25.3

balancers/balancers.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ func SingleConn() *routerconfig.Config {
2424

2525
// PreferLocalDC creates balancer which use endpoints only in location such as initial endpoint location
2626
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter by location
27+
// PreferLocalDC balancer try to autodetect local DC from client side.
2728
func PreferLocalDC(balancer *routerconfig.Config) *routerconfig.Config {
28-
balancer.IsPreferConn = func(c conn.Conn) bool {
29-
return c.Endpoint().LocalDC()
29+
balancer.IsPreferConn = func(routerInfo routerconfig.Info, c conn.Conn) bool {
30+
return c.Endpoint().Location() == routerInfo.SelfLocation
3031
}
32+
balancer.DetectlocalDC = true
3133
return balancer
3234
}
3335

@@ -49,7 +51,7 @@ func PreferLocations(balancer *routerconfig.Config, locations ...string) *router
4951
for i := range locations {
5052
locations[i] = strings.ToUpper(locations[i])
5153
}
52-
balancer.IsPreferConn = func(c conn.Conn) bool {
54+
balancer.IsPreferConn = func(_ routerconfig.Info, c conn.Conn) bool {
5355
location := strings.ToUpper(c.Endpoint().Location())
5456
for _, l := range locations {
5557
if location == l {
@@ -74,13 +76,16 @@ type Endpoint interface {
7476
NodeID() uint32
7577
Address() string
7678
Location() string
79+
80+
// Deprecated: LocalDC check "local" by compare endpoint location with discovery "selflocation" field.
81+
// It work good only if connection url always point to local dc.
7782
LocalDC() bool
7883
}
7984

8085
// Prefer creates balancer which use endpoints by filter
8186
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter
8287
func Prefer(balancer *routerconfig.Config, filter func(endpoint Endpoint) bool) *routerconfig.Config {
83-
balancer.IsPreferConn = func(c conn.Conn) bool {
88+
balancer.IsPreferConn = func(_ routerconfig.Info, c conn.Conn) bool {
8489
return filter(c.Endpoint())
8590
}
8691
return balancer

balancers/balancers_test.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,24 +12,24 @@ import (
1212

1313
func TestPreferLocalDC(t *testing.T) {
1414
conns := []conn.Conn{
15-
&mock.Conn{AddrField: "1", LocalDCField: false},
16-
&mock.Conn{AddrField: "2", State: conn.Online, LocalDCField: true},
17-
&mock.Conn{AddrField: "3", State: conn.Online, LocalDCField: true},
15+
&mock.Conn{AddrField: "1", LocationField: "1"},
16+
&mock.Conn{AddrField: "2", State: conn.Online, LocationField: "2"},
17+
&mock.Conn{AddrField: "3", State: conn.Online, LocationField: "2"},
1818
}
19-
rr := PreferLocalDC(RoundRobin())
19+
rr := PreferLocalDC(Default())
2020
require.False(t, rr.AllowFalback)
21-
require.Equal(t, []conn.Conn{conns[1], conns[2]}, applyPreferFilter(rr, conns))
21+
require.Equal(t, []conn.Conn{conns[1], conns[2]}, applyPreferFilter(routerconfig.Info{SelfLocation: "2"}, rr, conns))
2222
}
2323

2424
func TestPreferLocalDCWithFallBack(t *testing.T) {
2525
conns := []conn.Conn{
26-
&mock.Conn{AddrField: "1", LocalDCField: false},
27-
&mock.Conn{AddrField: "2", State: conn.Online, LocalDCField: true},
28-
&mock.Conn{AddrField: "3", State: conn.Online, LocalDCField: true},
26+
&mock.Conn{AddrField: "1", LocationField: "1"},
27+
&mock.Conn{AddrField: "2", State: conn.Online, LocationField: "2"},
28+
&mock.Conn{AddrField: "3", State: conn.Online, LocationField: "2"},
2929
}
30-
rr := PreferLocalDCWithFallBack(RoundRobin())
30+
rr := PreferLocalDCWithFallBack(Default())
3131
require.True(t, rr.AllowFalback)
32-
require.Equal(t, []conn.Conn{conns[1], conns[2]}, applyPreferFilter(rr, conns))
32+
require.Equal(t, []conn.Conn{conns[1], conns[2]}, applyPreferFilter(routerconfig.Info{SelfLocation: "2"}, rr, conns))
3333
}
3434

3535
func TestPreferLocations(t *testing.T) {
@@ -39,9 +39,9 @@ func TestPreferLocations(t *testing.T) {
3939
&mock.Conn{AddrField: "3", State: conn.Online, LocationField: "two"},
4040
}
4141

42-
rr := PreferLocations(RoundRobin(), "zero", "two")
42+
rr := PreferLocations(Default(), "zero", "two")
4343
require.False(t, rr.AllowFalback)
44-
require.Equal(t, []conn.Conn{conns[0], conns[2]}, applyPreferFilter(rr, conns))
44+
require.Equal(t, []conn.Conn{conns[0], conns[2]}, applyPreferFilter(routerconfig.Info{}, rr, conns))
4545
}
4646

4747
func TestPreferLocationsWithFallback(t *testing.T) {
@@ -51,18 +51,18 @@ func TestPreferLocationsWithFallback(t *testing.T) {
5151
&mock.Conn{AddrField: "3", State: conn.Online, LocationField: "two"},
5252
}
5353

54-
rr := PreferLocationsWithFallback(RoundRobin(), "zero", "two")
54+
rr := PreferLocationsWithFallback(Default(), "zero", "two")
5555
require.True(t, rr.AllowFalback)
56-
require.Equal(t, []conn.Conn{conns[0], conns[2]}, applyPreferFilter(rr, conns))
56+
require.Equal(t, []conn.Conn{conns[0], conns[2]}, applyPreferFilter(routerconfig.Info{}, rr, conns))
5757
}
5858

59-
func applyPreferFilter(b *routerconfig.Config, conns []conn.Conn) []conn.Conn {
59+
func applyPreferFilter(info routerconfig.Info, b *routerconfig.Config, conns []conn.Conn) []conn.Conn {
6060
if b.IsPreferConn == nil {
61-
b.IsPreferConn = func(c conn.Conn) bool { return true }
61+
b.IsPreferConn = func(routerInfo routerconfig.Info, c conn.Conn) bool { return true }
6262
}
6363
res := make([]conn.Conn, 0, len(conns))
6464
for _, c := range conns {
65-
if b.IsPreferConn(c) {
65+
if b.IsPreferConn(info, c) {
6666
res = append(res, c)
6767
}
6868
}

balancers/config_test.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,13 @@ func TestFromConfig(t *testing.T) {
5050
"type": "random_choice",
5151
"prefer": "local_dc"
5252
}`,
53-
res: routerconfig.Config{IsPreferConn: func(c conn.Conn) bool {
54-
// some non nil func
55-
return false
56-
}},
53+
res: routerconfig.Config{
54+
DetectlocalDC: true,
55+
IsPreferConn: func(routerInfo routerconfig.Info, c conn.Conn) bool {
56+
// some non nil func
57+
return false
58+
},
59+
},
5760
},
5861
{
5962
name: "prefer_unknown_type",
@@ -71,8 +74,9 @@ func TestFromConfig(t *testing.T) {
7174
"fallback": true
7275
}`,
7376
res: routerconfig.Config{
74-
AllowFalback: true,
75-
IsPreferConn: func(c conn.Conn) bool {
77+
AllowFalback: true,
78+
DetectlocalDC: true,
79+
IsPreferConn: func(routerInfo routerconfig.Info, c conn.Conn) bool {
7680
// some non nil func
7781
return false
7882
},
@@ -86,7 +90,7 @@ func TestFromConfig(t *testing.T) {
8690
"locations": ["AAA", "BBB", "CCC"]
8791
}`,
8892
res: routerconfig.Config{
89-
IsPreferConn: func(c conn.Conn) bool {
93+
IsPreferConn: func(routerInfo routerconfig.Info, c conn.Conn) bool {
9094
// some non nil func
9195
return false
9296
},
@@ -102,7 +106,7 @@ func TestFromConfig(t *testing.T) {
102106
}`,
103107
res: routerconfig.Config{
104108
AllowFalback: true,
105-
IsPreferConn: func(c conn.Conn) bool {
109+
IsPreferConn: func(routerInfo routerconfig.Info, c conn.Conn) bool {
106110
// some non nil func
107111
return false
108112
},

config/config.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type Config struct {
2828
trace trace.Driver
2929
dialTimeout time.Duration
3030
connectionTTL time.Duration
31-
balancer *routerconfig.Config
31+
routerConfig *routerconfig.Config
3232
secure bool
3333
endpoint string
3434
database string
@@ -106,7 +106,7 @@ func (c Config) Trace() trace.Driver {
106106
// Balancer is an optional configuration related to selected balancer.
107107
// That is, some balancing methods allow to be configured.
108108
func (c Config) Balancer() *routerconfig.Config {
109-
return c.balancer
109+
return c.routerConfig
110110
}
111111

112112
// RequestsType set an additional type hint to all requests.
@@ -231,7 +231,7 @@ func WithDialTimeout(timeout time.Duration) Option {
231231

232232
func WithBalancer(balancer *routerconfig.Config) Option {
233233
return func(c *Config) {
234-
c.balancer = balancer
234+
c.routerConfig = balancer
235235
}
236236
}
237237

@@ -312,8 +312,8 @@ func defaultConfig() (c Config) {
312312
credentials: credentials.NewAnonymousCredentials(
313313
credentials.WithSourceInfo("default"),
314314
),
315-
balancer: balancers.Default(),
316-
tlsConfig: defaultTLSConfig(),
315+
routerConfig: balancers.Default(),
316+
tlsConfig: defaultTLSConfig(),
317317
grpcOptions: []grpc.DialOption{
318318
grpc.WithContextDialer(
319319
func(ctx context.Context, address string) (net.Conn, error) {

connection.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,17 @@ func open(ctx context.Context, opts ...Option) (_ Connection, err error) {
390390
return nil, xerrors.WithStackTrace(errors.New("configuration: empty database"))
391391
}
392392

393+
onDone := trace.DriverOnInit(
394+
c.config.Trace(),
395+
&ctx,
396+
c.config.Endpoint(),
397+
c.config.Database(),
398+
c.config.Secure(),
399+
)
400+
defer func() {
401+
onDone(err)
402+
}()
403+
393404
if c.pool == nil {
394405
c.pool = conn.NewPool(
395406
ctx,

internal/ctxlabels/ctxlabels.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package ctxlabels
2+
3+
import "context"
4+
5+
type localDcKey struct{}
6+
7+
func WithLocalDC(ctx context.Context, dc string) context.Context {
8+
return context.WithValue(ctx, localDcKey{}, dc)
9+
}
10+
11+
func ExtractLocalDC(ctx context.Context) string {
12+
if val := ctx.Value(localDcKey{}); val != nil {
13+
return val.(string)
14+
}
15+
return ""
16+
}
Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
package routerconfig
22

3-
// Dedicated package need for prevent cyclo dependencies config -> router -> config
3+
import "github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
44

5-
import (
6-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
7-
)
5+
// Dedicated package need for prevent cyclo dependencies config -> router -> config
86

97
type Config struct {
10-
IsPreferConn PreferConnFunc
11-
AllowFalback bool
12-
SingleConn bool
8+
IsPreferConn PreferConnFunc
9+
AllowFalback bool
10+
SingleConn bool
11+
DetectlocalDC bool
12+
}
13+
14+
type Info struct {
15+
SelfLocation string
1316
}
1417

15-
type PreferConnFunc func(c conn.Conn) bool
18+
type PreferConnFunc func(routerInfo Info, c conn.Conn) bool

internal/router/connections_state.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type connectionsState struct {
2222
func newConnectionsState(
2323
conns []conn.Conn,
2424
preferFunc routerconfig.PreferConnFunc,
25+
routerInfo routerconfig.Info,
2526
allowFallback bool,
2627
) *connectionsState {
2728
res := &connectionsState{
@@ -33,7 +34,7 @@ func newConnectionsState(
3334
),
3435
}
3536

36-
res.prefer, res.fallback = sortPreferConnections(conns, preferFunc, allowFallback)
37+
res.prefer, res.fallback = sortPreferConnections(conns, preferFunc, routerInfo, allowFallback)
3738
if allowFallback {
3839
res.all = conns
3940
} else {
@@ -130,6 +131,7 @@ func connsToNodeIDMap(conns []conn.Conn) (res map[uint32]conn.Conn) {
130131
func sortPreferConnections(
131132
conns []conn.Conn,
132133
preferFunc routerconfig.PreferConnFunc,
134+
routerInfo routerconfig.Info,
133135
allowFallback bool,
134136
) (prefer []conn.Conn, fallback []conn.Conn) {
135137
if preferFunc == nil {
@@ -142,7 +144,7 @@ func sortPreferConnections(
142144
}
143145

144146
for _, c := range conns {
145-
if preferFunc(c) {
147+
if preferFunc(routerInfo, c) {
146148
prefer = append(prefer, c)
147149
} else if allowFallback {
148150
fallback = append(fallback, c)

0 commit comments

Comments
 (0)