Skip to content

Commit 1ba3a4d

Browse files
authored
Merge pull request #892 from ydb-platform/added-dropped
added `Added` and `Dropped` fields into balancer update done info
2 parents 6373d76 + e3e3bd5 commit 1ba3a4d

File tree

6 files changed

+190
-19
lines changed

6 files changed

+190
-19
lines changed

internal/balancer/balancer.go

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package balancer
33
import (
44
"context"
55
"fmt"
6+
"sort"
67

78
"google.golang.org/grpc"
89

@@ -33,7 +34,7 @@ type discoveryClient interface {
3334

3435
type Balancer struct {
3536
driverConfig *config.Config
36-
balancerConfig balancerConfig.Config
37+
config balancerConfig.Config
3738
pool *conn.Pool
3839
discoveryClient discoveryClient
3940
discoveryRepeater repeater.Repeater
@@ -46,7 +47,7 @@ type Balancer struct {
4647
}
4748

4849
func (b *Balancer) HasNode(id uint32) bool {
49-
if b.balancerConfig.SingleConn {
50+
if b.config.SingleConn {
5051
return true
5152
}
5253
b.mu.RLock()
@@ -114,7 +115,7 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
114115
return xerrors.WithStackTrace(err)
115116
}
116117

117-
if b.balancerConfig.DetectLocalDC {
118+
if b.config.DetectLocalDC {
118119
localDC, err = b.localDCDetector(ctx, endpoints)
119120
if err != nil {
120121
return xerrors.WithStackTrace(err)
@@ -126,16 +127,52 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
126127
return nil
127128
}
128129

130+
func endpointsDiff(newestEndpoints []endpoint.Endpoint, previousConns []conn.Conn) (
131+
nodes []trace.EndpointInfo,
132+
added []trace.EndpointInfo,
133+
dropped []trace.EndpointInfo,
134+
) {
135+
nodes = make([]trace.EndpointInfo, 0, len(newestEndpoints))
136+
added = make([]trace.EndpointInfo, 0, len(previousConns))
137+
dropped = make([]trace.EndpointInfo, 0, len(previousConns))
138+
var (
139+
newestMap = make(map[string]struct{}, len(newestEndpoints))
140+
previousMap = make(map[string]struct{}, len(previousConns))
141+
)
142+
sort.Slice(newestEndpoints, func(i, j int) bool {
143+
return newestEndpoints[i].Address() < newestEndpoints[j].Address()
144+
})
145+
sort.Slice(previousConns, func(i, j int) bool {
146+
return previousConns[i].Endpoint().Address() < previousConns[j].Endpoint().Address()
147+
})
148+
for _, e := range previousConns {
149+
previousMap[e.Endpoint().Address()] = struct{}{}
150+
}
151+
for _, e := range newestEndpoints {
152+
nodes = append(nodes, e.Copy())
153+
newestMap[e.Address()] = struct{}{}
154+
if _, has := previousMap[e.Address()]; !has {
155+
added = append(added, e.Copy())
156+
}
157+
}
158+
for _, c := range previousConns {
159+
if _, has := newestMap[c.Endpoint().Address()]; !has {
160+
dropped = append(dropped, c.Endpoint().Copy())
161+
}
162+
}
163+
return nodes, added, dropped
164+
}
165+
129166
func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []endpoint.Endpoint, localDC string) {
130-
onDone := trace.DriverOnBalancerUpdate(
131-
b.driverConfig.Trace(), &ctx, stack.FunctionID(0), b.balancerConfig.DetectLocalDC,
167+
var (
168+
onDone = trace.DriverOnBalancerUpdate(
169+
b.driverConfig.Trace(), &ctx, stack.FunctionID(0), b.config.DetectLocalDC,
170+
)
171+
previousConns []conn.Conn
132172
)
133173
defer func() {
134-
nodes := make([]trace.EndpointInfo, 0, len(endpoints))
135-
for _, e := range endpoints {
136-
nodes = append(nodes, e.Copy())
137-
}
138-
onDone(nodes, localDC, nil)
174+
nodes, added, dropped := endpointsDiff(endpoints, previousConns)
175+
onDone(nodes, added, dropped, localDC, nil)
139176
}()
140177

141178
connections := endpointsToConnections(b.pool, endpoints)
@@ -145,14 +182,17 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end
145182
}
146183

147184
info := balancerConfig.Info{SelfLocation: localDC}
148-
state := newConnectionsState(connections, b.balancerConfig.Filter, info, b.balancerConfig.AllowFallback)
185+
state := newConnectionsState(connections, b.config.Filter, info, b.config.AllowFallback)
149186

150187
endpointsInfo := make([]endpoint.Info, len(endpoints))
151188
for i, e := range endpoints {
152189
endpointsInfo[i] = e
153190
}
154191

155192
b.mu.WithLock(func() {
193+
if b.connectionsState != nil {
194+
previousConns = b.connectionsState.all
195+
}
156196
b.connectionsState = state
157197
for _, onApplyDiscoveredEndpoints := range b.onApplyDiscoveredEndpoints {
158198
onApplyDiscoveredEndpoints(ctx, endpointsInfo)
@@ -216,12 +256,12 @@ func New(
216256
b.discoveryClient = d
217257

218258
if config := driverConfig.Balancer(); config == nil {
219-
b.balancerConfig = balancerConfig.Config{}
259+
b.config = balancerConfig.Config{}
220260
} else {
221-
b.balancerConfig = *config
261+
b.config = *config
222262
}
223263

224-
if b.balancerConfig.SingleConn {
264+
if b.config.SingleConn {
225265
b.applyDiscoveredEndpoints(ctx, []endpoint.Endpoint{
226266
endpoint.New(driverConfig.Endpoint()),
227267
}, "")

internal/balancer/balancer_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package balancer
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
8+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
9+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
10+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/mock"
11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
13+
)
14+
15+
func TestEndpointsDiff(t *testing.T) {
16+
for _, tt := range []struct {
17+
newestEndpoints []endpoint.Endpoint
18+
previousConns []conn.Conn
19+
nodes []trace.EndpointInfo
20+
added []trace.EndpointInfo
21+
dropped []trace.EndpointInfo
22+
}{
23+
{
24+
newestEndpoints: []endpoint.Endpoint{
25+
&mock.Endpoint{AddrField: "1"},
26+
&mock.Endpoint{AddrField: "3"},
27+
&mock.Endpoint{AddrField: "2"},
28+
&mock.Endpoint{AddrField: "0"},
29+
},
30+
previousConns: []conn.Conn{
31+
&mock.Conn{AddrField: "2"},
32+
&mock.Conn{AddrField: "1"},
33+
&mock.Conn{AddrField: "0"},
34+
&mock.Conn{AddrField: "3"},
35+
},
36+
nodes: []trace.EndpointInfo{
37+
&mock.Endpoint{AddrField: "0"},
38+
&mock.Endpoint{AddrField: "1"},
39+
&mock.Endpoint{AddrField: "2"},
40+
&mock.Endpoint{AddrField: "3"},
41+
},
42+
added: []trace.EndpointInfo{},
43+
dropped: []trace.EndpointInfo{},
44+
},
45+
{
46+
newestEndpoints: []endpoint.Endpoint{
47+
&mock.Endpoint{AddrField: "1"},
48+
&mock.Endpoint{AddrField: "3"},
49+
&mock.Endpoint{AddrField: "2"},
50+
&mock.Endpoint{AddrField: "0"},
51+
},
52+
previousConns: []conn.Conn{
53+
&mock.Conn{AddrField: "1"},
54+
&mock.Conn{AddrField: "0"},
55+
&mock.Conn{AddrField: "3"},
56+
},
57+
nodes: []trace.EndpointInfo{
58+
&mock.Endpoint{AddrField: "0"},
59+
&mock.Endpoint{AddrField: "1"},
60+
&mock.Endpoint{AddrField: "2"},
61+
&mock.Endpoint{AddrField: "3"},
62+
},
63+
added: []trace.EndpointInfo{
64+
&mock.Endpoint{AddrField: "2"},
65+
},
66+
dropped: []trace.EndpointInfo{},
67+
},
68+
{
69+
newestEndpoints: []endpoint.Endpoint{
70+
&mock.Endpoint{AddrField: "1"},
71+
&mock.Endpoint{AddrField: "3"},
72+
&mock.Endpoint{AddrField: "0"},
73+
},
74+
previousConns: []conn.Conn{
75+
&mock.Conn{AddrField: "1"},
76+
&mock.Conn{AddrField: "2"},
77+
&mock.Conn{AddrField: "0"},
78+
&mock.Conn{AddrField: "3"},
79+
},
80+
nodes: []trace.EndpointInfo{
81+
&mock.Endpoint{AddrField: "0"},
82+
&mock.Endpoint{AddrField: "1"},
83+
&mock.Endpoint{AddrField: "3"},
84+
},
85+
added: []trace.EndpointInfo{},
86+
dropped: []trace.EndpointInfo{
87+
&mock.Endpoint{AddrField: "2"},
88+
},
89+
},
90+
{
91+
newestEndpoints: []endpoint.Endpoint{
92+
&mock.Endpoint{AddrField: "1"},
93+
&mock.Endpoint{AddrField: "3"},
94+
&mock.Endpoint{AddrField: "0"},
95+
},
96+
previousConns: []conn.Conn{
97+
&mock.Conn{AddrField: "4"},
98+
&mock.Conn{AddrField: "7"},
99+
&mock.Conn{AddrField: "8"},
100+
},
101+
nodes: []trace.EndpointInfo{
102+
&mock.Endpoint{AddrField: "0"},
103+
&mock.Endpoint{AddrField: "1"},
104+
&mock.Endpoint{AddrField: "3"},
105+
},
106+
added: []trace.EndpointInfo{
107+
&mock.Endpoint{AddrField: "0"},
108+
&mock.Endpoint{AddrField: "1"},
109+
&mock.Endpoint{AddrField: "3"},
110+
},
111+
dropped: []trace.EndpointInfo{
112+
&mock.Endpoint{AddrField: "4"},
113+
&mock.Endpoint{AddrField: "7"},
114+
&mock.Endpoint{AddrField: "8"},
115+
},
116+
},
117+
} {
118+
t.Run(xtest.CurrentFileLine(), func(t *testing.T) {
119+
nodes, added, dropped := endpointsDiff(tt.newestEndpoints, tt.previousConns)
120+
require.Equal(t, tt.nodes, nodes)
121+
require.Equal(t, tt.added, added)
122+
require.Equal(t, tt.dropped, dropped)
123+
})
124+
}
125+
}

internal/balancer/local_dc_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,9 @@ func TestLocalDCDiscovery(t *testing.T) {
134134
config.WithBalancer(balancers.PreferLocalDC(balancers.Default())),
135135
)
136136
r := &Balancer{
137-
driverConfig: cfg,
138-
balancerConfig: *cfg.Balancer(),
139-
pool: conn.NewPool(context.Background(), cfg),
137+
driverConfig: cfg,
138+
config: *cfg.Balancer(),
139+
pool: conn.NewPool(context.Background(), cfg),
140140
discoveryClient: discoveryMock{endpoints: []endpoint.Endpoint{
141141
&mock.Endpoint{AddrField: "a:123", LocationField: "a"},
142142
&mock.Endpoint{AddrField: "b:234", LocationField: "b"},

log/driver.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,8 @@ func internalDriver(l *wrapper, d trace.Detailer) (t trace.Driver) { //nolint:go
430430
l.Log(ctx, "done",
431431
latencyField(start),
432432
Stringer("endpoints", endpoints(info.Endpoints)),
433+
Stringer("added", endpoints(info.Added)),
434+
Stringer("dropped", endpoints(info.Dropped)),
433435
String("detectedLocalDC", info.LocalDC),
434436
)
435437
}

trace/driver.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ type (
165165
}
166166
DriverBalancerUpdateDoneInfo struct {
167167
Endpoints []EndpointInfo
168+
Added []EndpointInfo
169+
Dropped []EndpointInfo
168170
LocalDC string
169171
// Deprecated: this field always nil
170172
Error error

trace/driver_gtrace.go

Lines changed: 4 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)