Skip to content

Commit b7c2eb5

Browse files
authored
Merge pull request #1356 from ydb-platform/cluster
renamed internal/balancer/state => internal/balancer/cluster
2 parents 4225cd6 + f5952cb commit b7c2eb5

File tree

3 files changed

+104
-56
lines changed

3 files changed

+104
-56
lines changed

internal/balancer/state/state.go renamed to internal/balancer/cluster/cluster.go

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package state
1+
package cluster
22

33
import (
44
"context"
@@ -10,7 +10,7 @@ import (
1010
)
1111

1212
type (
13-
state struct {
13+
Cluster struct {
1414
filter func(e endpoint.Info) bool
1515
allowFallback bool
1616

@@ -22,29 +22,23 @@ type (
2222

2323
rand xrand.Rand
2424
}
25-
option func(s *state)
25+
option func(s *Cluster)
2626
)
2727

2828
func WithFilter(filter func(e endpoint.Info) bool) option {
29-
return func(s *state) {
29+
return func(s *Cluster) {
3030
s.filter = filter
3131
}
3232
}
3333

34-
func WithFallback() option {
35-
return func(s *state) {
36-
s.allowFallback = true
34+
func WithFallback(allowFallback bool) option {
35+
return func(s *Cluster) {
36+
s.allowFallback = allowFallback
3737
}
3838
}
3939

40-
func withRand(rand xrand.Rand) option {
41-
return func(s *state) {
42-
s.rand = rand
43-
}
44-
}
45-
46-
func New(endpoints []endpoint.Endpoint, opts ...option) *state {
47-
s := &state{
40+
func New(endpoints []endpoint.Endpoint, opts ...option) *Cluster {
41+
s := &Cluster{
4842
filter: func(e endpoint.Info) bool {
4943
return true
5044
},
@@ -74,23 +68,41 @@ func New(endpoints []endpoint.Endpoint, opts ...option) *state {
7468
return s
7569
}
7670

77-
func (s *state) All() (all []endpoint.Endpoint) {
71+
func (s *Cluster) All() (all []endpoint.Endpoint) {
7872
if s == nil {
7973
return nil
8074
}
8175

8276
return s.all
8377
}
8478

85-
func (s *state) Exclude(e endpoint.Endpoint) *state {
86-
return New(xslices.Filter(s.all, func(endpoint endpoint.Endpoint) bool {
87-
return e.Address() != endpoint.Address()
88-
}), withRand(s.rand))
79+
func Without(s *Cluster, endpoints ...endpoint.Endpoint) *Cluster {
80+
prefer := make([]endpoint.Endpoint, 0, len(s.prefer))
81+
fallback := s.fallback
82+
for _, endpoint := range endpoints {
83+
for i := range s.prefer {
84+
if s.prefer[i].Address() != endpoint.Address() {
85+
prefer = append(prefer, s.prefer[i])
86+
} else {
87+
fallback = append(fallback, s.prefer[i])
88+
}
89+
}
90+
}
91+
92+
return &Cluster{
93+
filter: s.filter,
94+
allowFallback: s.allowFallback,
95+
index: s.index,
96+
prefer: prefer,
97+
fallback: fallback,
98+
all: s.all,
99+
rand: s.rand,
100+
}
89101
}
90102

91-
func (s *state) Next(ctx context.Context) (endpoint.Endpoint, error) {
103+
func (s *Cluster) Next(ctx context.Context) (endpoint.Endpoint, error) {
92104
if s == nil {
93-
return nil, ErrNilState
105+
return nil, ErrNilPtr
94106
}
95107

96108
if err := ctx.Err(); err != nil {

internal/balancer/state/state_test.go renamed to internal/balancer/cluster/cluster_test.go

Lines changed: 68 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package state
1+
package cluster
22

33
import (
44
"context"
@@ -11,20 +11,19 @@ import (
1111

1212
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1313
"github.com/ydb-platform/ydb-go-sdk/v3/internal/mock"
14-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xslices"
1514
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
1615
)
1716

18-
func TestState(t *testing.T) {
17+
func TestCluster(t *testing.T) {
1918
ctx := xtest.Context(t)
2019

2120
t.Run("Nil", func(t *testing.T) {
22-
var s *state
21+
var s *Cluster
2322

2423
require.Empty(t, s.All())
2524

2625
e, err := s.Next(ctx)
27-
require.ErrorIs(t, err, ErrNilState)
26+
require.ErrorIs(t, err, ErrNilPtr)
2827
require.Nil(t, e)
2928
})
3029

@@ -69,7 +68,7 @@ func TestState(t *testing.T) {
6968
require.Nil(t, e)
7069
})
7170

72-
t.Run("Exclude", func(t *testing.T) {
71+
t.Run("Without", func(t *testing.T) {
7372
s := New([]endpoint.Endpoint{
7473
&mock.Endpoint{
7574
AddrField: "1",
@@ -93,35 +92,72 @@ func TestState(t *testing.T) {
9392
},
9493
})
9594

96-
var endpoints []endpoint.Endpoint
95+
{ // initial state
96+
require.Len(t, s.All(), 5)
97+
require.Len(t, s.index, 5)
98+
require.Len(t, s.prefer, 5)
99+
}
97100

98-
for i := 0; i < 5; i++ {
101+
{ // without first endpoint
99102
e, err := s.Next(ctx)
100103
require.NoError(t, err)
101104
require.NotNil(t, e)
102-
endpoints = append(endpoints, e)
103-
s = s.Exclude(e)
104-
require.Len(t, s.All(), 4-i)
105+
s = Without(s, e)
106+
require.Len(t, s.All(), 5)
107+
require.Len(t, s.index, 5)
108+
require.Len(t, s.prefer, 4)
109+
require.Len(t, s.fallback, 1)
105110
}
106111

107-
e, err := s.Next(ctx)
108-
require.ErrorIs(t, err, ErrNoEndpoints)
109-
require.Nil(t, e)
110-
require.Empty(t, s.All())
112+
{ // without second endpoint
113+
e, err := s.Next(ctx)
114+
require.NoError(t, err)
115+
require.NotNil(t, e)
116+
s = Without(s, e)
117+
require.Len(t, s.All(), 5)
118+
require.Len(t, s.index, 5)
119+
require.Len(t, s.prefer, 3)
120+
require.Len(t, s.fallback, 2)
121+
}
111122

112-
require.Equal(t,
113-
[]uint32{1, 2, 3, 4, 5},
114-
xslices.SortCopy(
115-
xslices.Transform(
116-
endpoints,
117-
func(e endpoint.Endpoint) uint32 {
118-
return e.NodeID()
119-
},
120-
), func(lhs, rhs uint32) int {
121-
return int(lhs) - int(rhs)
122-
},
123-
),
124-
)
123+
{ // without third endpoint
124+
e, err := s.Next(ctx)
125+
require.NoError(t, err)
126+
require.NotNil(t, e)
127+
s = Without(s, e)
128+
require.Len(t, s.All(), 5)
129+
require.Len(t, s.index, 5)
130+
require.Len(t, s.prefer, 2)
131+
require.Len(t, s.fallback, 3)
132+
}
133+
134+
{ // without fourth endpoint
135+
e, err := s.Next(ctx)
136+
require.NoError(t, err)
137+
require.NotNil(t, e)
138+
s = Without(s, e)
139+
require.Len(t, s.All(), 5)
140+
require.Len(t, s.index, 5)
141+
require.Len(t, s.prefer, 1)
142+
require.Len(t, s.fallback, 4)
143+
}
144+
145+
{ // without fifth endpoint
146+
e, err := s.Next(ctx)
147+
require.NoError(t, err)
148+
require.NotNil(t, e)
149+
s = Without(s, e)
150+
require.Len(t, s.All(), 5)
151+
require.Len(t, s.index, 5)
152+
require.Empty(t, s.prefer)
153+
require.Len(t, s.fallback, 5)
154+
}
155+
156+
{ // next from fallback is ok
157+
e, err := s.Next(ctx)
158+
require.NoError(t, err)
159+
require.NotNil(t, e)
160+
}
125161
})
126162

127163
t.Run("WithFilter", func(t *testing.T) {
@@ -173,7 +209,7 @@ func TestState(t *testing.T) {
173209
},
174210
}, WithFilter(func(e endpoint.Info) bool {
175211
return e.NodeID()%2 == 0
176-
}), WithFallback())
212+
}), WithFallback(true))
177213

178214
require.Len(t, s.index, 4)
179215
require.Len(t, s.All(), 4)
@@ -189,7 +225,7 @@ func TestState(t *testing.T) {
189225
},
190226
}, WithFilter(func(e endpoint.Info) bool {
191227
return false
192-
}), WithFallback())
228+
}), WithFallback(true))
193229

194230
require.Len(t, s.index, 1)
195231
require.Len(t, s.All(), 1)
@@ -236,7 +272,7 @@ func TestState(t *testing.T) {
236272
const (
237273
buckets = 10
238274
total = 1000000
239-
epsilon = int(float64(total) / float64(buckets) * 0.01)
275+
epsilon = int(float64(total) / float64(buckets) * 0.015)
240276
)
241277
endpoints := make([]endpoint.Endpoint, buckets)
242278

@@ -321,7 +357,7 @@ func benchmarkNextParallel(b *testing.B, parallelism int) {
321357
},
322358
}, WithFilter(func(e endpoint.Info) bool {
323359
return e.NodeID()%2 == 0
324-
}), WithFallback())
360+
}), WithFallback(true))
325361

326362
b.ReportAllocs()
327363

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package state
1+
package cluster
22

33
import (
44
"fmt"
@@ -8,5 +8,5 @@ import (
88

99
var (
1010
ErrNoEndpoints = xerrors.Wrap(fmt.Errorf("no endpoints"))
11-
ErrNilState = xerrors.Wrap(fmt.Errorf("nil state"))
11+
ErrNilPtr = xerrors.Wrap(fmt.Errorf("nil pointer"))
1212
)

0 commit comments

Comments
 (0)