Skip to content

Commit 98a04d2

Browse files
authored
Merge pull request #1344 from ydb-platform/balancer-endpoints-diff
switched balancer private func endpointsDiff to xslices.Diff
2 parents 13fe198 + 32f0f7d commit 98a04d2

File tree

3 files changed

+36
-181
lines changed

3 files changed

+36
-181
lines changed

internal/balancer/balancer.go

Lines changed: 23 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package balancer
33
import (
44
"context"
55
"fmt"
6-
"sort"
6+
"strings"
77

88
"google.golang.org/grpc"
99

@@ -19,6 +19,7 @@ import (
1919
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
2020
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
2121
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
22+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xslices"
2223
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
2324
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
2425
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
@@ -121,59 +122,29 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
121122
return nil
122123
}
123124

124-
func endpointsDiff(newestEndpoints []endpoint.Endpoint, previousConns []conn.Conn) (
125-
nodes []trace.EndpointInfo,
126-
added []trace.EndpointInfo,
127-
dropped []trace.EndpointInfo,
128-
) {
129-
nodes = make([]trace.EndpointInfo, 0, len(newestEndpoints))
130-
added = make([]trace.EndpointInfo, 0, len(previousConns))
131-
dropped = make([]trace.EndpointInfo, 0, len(previousConns))
132-
var (
133-
newestMap = make(map[string]struct{}, len(newestEndpoints))
134-
previousMap = make(map[string]struct{}, len(previousConns))
135-
)
136-
sort.Slice(newestEndpoints, func(i, j int) bool {
137-
return newestEndpoints[i].Address() < newestEndpoints[j].Address()
138-
})
139-
sort.Slice(previousConns, func(i, j int) bool {
140-
return previousConns[i].Endpoint().Address() < previousConns[j].Endpoint().Address()
141-
})
142-
for _, e := range previousConns {
143-
previousMap[e.Endpoint().Address()] = struct{}{}
144-
}
145-
for _, e := range newestEndpoints {
146-
nodes = append(nodes, e.Copy())
147-
newestMap[e.Address()] = struct{}{}
148-
if _, has := previousMap[e.Address()]; !has {
149-
added = append(added, e.Copy())
150-
}
151-
}
152-
for _, c := range previousConns {
153-
if _, has := newestMap[c.Endpoint().Address()]; !has {
154-
dropped = append(dropped, c.Endpoint().Copy())
155-
}
156-
}
157-
158-
return nodes, added, dropped
159-
}
160-
161-
func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []endpoint.Endpoint, localDC string) {
125+
func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoint.Endpoint, localDC string) {
162126
var (
163127
onDone = trace.DriverOnBalancerUpdate(
164128
b.driverConfig.Trace(), &ctx,
165129
stack.FunctionID(
166130
"github.com/ydb-platform/ydb-go-sdk/3/internal/balancer.(*Balancer).applyDiscoveredEndpoints"),
167131
b.config.DetectLocalDC,
168132
)
169-
previousConns []conn.Conn
133+
previous = b.connections().All()
170134
)
171135
defer func() {
172-
nodes, added, dropped := endpointsDiff(endpoints, previousConns)
173-
onDone(nodes, added, dropped, localDC)
136+
_, added, dropped := xslices.Diff(previous, newest, func(lhs, rhs endpoint.Endpoint) int {
137+
return strings.Compare(lhs.Address(), rhs.Address())
138+
})
139+
onDone(
140+
xslices.Transform(newest, func(t endpoint.Endpoint) trace.EndpointInfo { return t }),
141+
xslices.Transform(added, func(t endpoint.Endpoint) trace.EndpointInfo { return t }),
142+
xslices.Transform(dropped, func(t endpoint.Endpoint) trace.EndpointInfo { return t }),
143+
localDC,
144+
)
174145
}()
175146

176-
connections := endpointsToConnections(b.pool, endpoints)
147+
connections := endpointsToConnections(b.pool, newest)
177148
for _, c := range connections {
178149
b.pool.Allow(ctx, c)
179150
c.Endpoint().Touch()
@@ -182,15 +153,12 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end
182153
info := balancerConfig.Info{SelfLocation: localDC}
183154
state := newConnectionsState(connections, b.config.Filter, info, b.config.AllowFallback)
184155

185-
endpointsInfo := make([]endpoint.Info, len(endpoints))
186-
for i, e := range endpoints {
156+
endpointsInfo := make([]endpoint.Info, len(newest))
157+
for i, e := range newest {
187158
endpointsInfo[i] = e
188159
}
189160

190161
b.mu.WithLock(func() {
191-
if b.connectionsState != nil {
192-
previousConns = b.connectionsState.all
193-
}
194162
b.connectionsState = state
195163
for _, onApplyDiscoveredEndpoints := range b.onApplyDiscoveredEndpoints {
196164
onApplyDiscoveredEndpoints(ctx, endpointsInfo)
@@ -243,15 +211,14 @@ func New(
243211
}()
244212

245213
b = &Balancer{
246-
driverConfig: driverConfig,
247-
pool: pool,
248-
localDCDetector: detectLocalDC,
214+
driverConfig: driverConfig,
215+
pool: pool,
216+
discoveryClient: internalDiscovery.New(ctx, pool.Get(
217+
endpoint.New(driverConfig.Endpoint()),
218+
), discoveryConfig),
219+
connectionsState: &connectionsState{},
220+
localDCDetector: detectLocalDC,
249221
}
250-
d := internalDiscovery.New(ctx, pool.Get(
251-
endpoint.New(driverConfig.Endpoint()),
252-
), discoveryConfig)
253-
254-
b.discoveryClient = d
255222

256223
if config := driverConfig.Balancer(); config == nil {
257224
b.config = balancerConfig.Config{}

internal/balancer/balancer_test.go

Lines changed: 0 additions & 125 deletions
This file was deleted.

internal/balancer/connections_state.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,19 @@ func (s *connectionsState) PreferredCount() int {
4444
return len(s.prefer)
4545
}
4646

47+
func (s *connectionsState) All() (all []endpoint.Endpoint) {
48+
if s == nil {
49+
return nil
50+
}
51+
52+
all = make([]endpoint.Endpoint, len(s.all))
53+
for i, c := range s.all {
54+
all[i] = c.Endpoint()
55+
}
56+
57+
return all
58+
}
59+
4760
func (s *connectionsState) GetConnection(ctx context.Context) (_ conn.Conn, failedCount int) {
4861
if err := ctx.Err(); err != nil {
4962
return nil, 0

0 commit comments

Comments
 (0)