Skip to content

Commit 41673f4

Browse files
committed
added internal/balancer/state package
1 parent d90fdb7 commit 41673f4

File tree

3 files changed

+460
-0
lines changed

3 files changed

+460
-0
lines changed

internal/balancer/state/errors.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package state
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
7+
)
8+
9+
var (
10+
ErrNoEndpoints = xerrors.Wrap(fmt.Errorf("no endpoints"))
11+
ErrNilState = xerrors.Wrap(fmt.Errorf("nil state"))
12+
)

internal/balancer/state/state.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package state
2+
3+
import (
4+
"context"
5+
6+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
7+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
8+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xrand"
9+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xslices"
10+
)
11+
12+
type (
13+
state struct {
14+
filter func(e endpoint.Info) bool
15+
allowFallback bool
16+
17+
index map[uint32]endpoint.Endpoint
18+
19+
prefer []endpoint.Endpoint
20+
fallback []endpoint.Endpoint
21+
all []endpoint.Endpoint
22+
23+
rand xrand.Rand
24+
}
25+
option func(s *state)
26+
)
27+
28+
func WithFilter(filter func(e endpoint.Info) bool) option {
29+
return func(s *state) {
30+
s.filter = filter
31+
}
32+
}
33+
34+
func WithFallback() option {
35+
return func(s *state) {
36+
s.allowFallback = true
37+
}
38+
}
39+
40+
func withRand(rand xrand.Rand) option {
41+
return func(s *state) {
42+
s.rand = rand
43+
}
44+
}
45+
46+
func New(endpoints []endpoint.Endpoint, opts ...option) *state {
47+
s := &state{
48+
filter: func(e endpoint.Info) bool {
49+
return true
50+
},
51+
}
52+
53+
for _, opt := range opts {
54+
opt(s)
55+
}
56+
57+
if s.rand == nil {
58+
s.rand = xrand.New(xrand.WithLock())
59+
}
60+
61+
s.prefer, s.fallback = xslices.Split(endpoints, func(e endpoint.Endpoint) bool {
62+
return s.filter(e)
63+
})
64+
65+
if s.allowFallback {
66+
s.all = endpoints
67+
s.index = xslices.Map(endpoints, func(e endpoint.Endpoint) uint32 { return e.NodeID() })
68+
} else {
69+
s.all = s.prefer
70+
s.fallback = nil
71+
s.index = xslices.Map(s.prefer, func(e endpoint.Endpoint) uint32 { return e.NodeID() })
72+
}
73+
74+
return s
75+
}
76+
77+
func (s *state) All() (all []endpoint.Endpoint) {
78+
if s == nil {
79+
return nil
80+
}
81+
82+
return s.all
83+
}
84+
85+
func (s *state) Exclude(e endpoint.Endpoint) *state {
86+
return New(xslices.Filter(s.all, func(endpoint endpoint.Endpoint) bool {
87+
return e.Address() != endpoint.Address()
88+
}), withRand(s.rand))
89+
}
90+
91+
func (s *state) Next(ctx context.Context) (endpoint.Endpoint, error) {
92+
if s == nil {
93+
return nil, ErrNilState
94+
}
95+
96+
if err := ctx.Err(); err != nil {
97+
return nil, xerrors.WithStackTrace(err)
98+
}
99+
100+
if nodeID, wantEndpointByNodeID := endpoint.ContextNodeID(ctx); wantEndpointByNodeID {
101+
e, has := s.index[nodeID]
102+
if has {
103+
return e, nil
104+
}
105+
}
106+
107+
if l := len(s.prefer); l > 0 {
108+
return s.prefer[s.rand.Int(l)], nil
109+
}
110+
111+
if l := len(s.fallback); l > 0 {
112+
return s.fallback[s.rand.Int(l)], nil
113+
}
114+
115+
return nil, xerrors.WithStackTrace(ErrNoEndpoints)
116+
}

0 commit comments

Comments
 (0)