Skip to content

Commit 4a26a17

Browse files
authored
Merge pull request #1226 from ydb-platform/fqdn-mutator
* Added `ydb.WithNodeAddressMutator` option for mutate original node …
2 parents 9d49511 + 2dbf057 commit 4a26a17

File tree

7 files changed

+347
-33
lines changed

7 files changed

+347
-33
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Added `ydb.WithNodeAddressMutator` experimental option for mutate node addresses from `discovery.ListEndpoints` response
12
* Added type assertion checks to enhance type safety and prevent unexpected panics in critical sections of the codebase
23

34
## v3.66.3

internal/discovery/config/config.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package config
33
import (
44
"time"
55

6+
"github.com/jonboulle/clockwork"
7+
68
"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
79
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
810
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
@@ -15,10 +17,12 @@ const (
1517
type Config struct {
1618
config.Common
1719

18-
endpoint string
19-
database string
20-
secure bool
21-
meta *meta.Meta
20+
endpoint string
21+
database string
22+
secure bool
23+
meta *meta.Meta
24+
addressMutator func(address string) string
25+
clock clockwork.Clock
2226

2327
interval time.Duration
2428
trace *trace.Discovery
@@ -28,6 +32,10 @@ func New(opts ...Option) *Config {
2832
c := &Config{
2933
interval: DefaultInterval,
3034
trace: &trace.Discovery{},
35+
addressMutator: func(address string) string {
36+
return address
37+
},
38+
clock: clockwork.NewRealClock(),
3139
}
3240
for _, opt := range opts {
3341
if opt != nil {
@@ -38,10 +46,18 @@ func New(opts ...Option) *Config {
3846
return c
3947
}
4048

49+
func (c *Config) MutateAddress(fqdn string) string {
50+
return c.addressMutator(fqdn)
51+
}
52+
4153
func (c *Config) Meta() *meta.Meta {
4254
return c.meta
4355
}
4456

57+
func (c *Config) Clock() clockwork.Clock {
58+
return c.clock
59+
}
60+
4561
func (c *Config) Interval() time.Duration {
4662
return c.interval
4763
}
@@ -85,6 +101,18 @@ func WithDatabase(database string) Option {
85101
}
86102
}
87103

104+
func WithClock(clock clockwork.Clock) Option {
105+
return func(c *Config) {
106+
c.clock = clock
107+
}
108+
}
109+
110+
func WithAddressMutator(addressMutator func(address string) string) Option {
111+
return func(c *Config) {
112+
c.addressMutator = addressMutator
113+
}
114+
}
115+
88116
// WithSecure set flag for secure connection
89117
func WithSecure(ssl bool) Option {
90118
return func(c *Config) {

internal/discovery/discovery.go

Lines changed: 49 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2020
)
2121

22+
//go:generate mockgen -destination grpc_client_mock_test.go -package discovery -write_package_comment=false github.com/ydb-platform/ydb-go-genproto/Ydb_Discovery_V1 DiscoveryServiceClient
23+
2224
func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) *Client {
2325
return &Client{
2426
config: config,
@@ -35,65 +37,85 @@ type Client struct {
3537
client Ydb_Discovery_V1.DiscoveryServiceClient
3638
}
3739

38-
// Discover cluster endpoints
39-
func (c *Client) Discover(ctx context.Context) (endpoints []endpoint.Endpoint, err error) {
40+
func discover(
41+
ctx context.Context,
42+
client Ydb_Discovery_V1.DiscoveryServiceClient,
43+
config *config.Config,
44+
) (endpoints []endpoint.Endpoint, location string, err error) {
4045
var (
41-
onDone = trace.DiscoveryOnDiscover(
42-
c.config.Trace(), &ctx,
43-
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/discovery.(*Client).Discover"),
44-
c.config.Endpoint(), c.config.Database(),
45-
)
4646
request = Ydb_Discovery.ListEndpointsRequest{
47-
Database: c.config.Database(),
47+
Database: config.Database(),
4848
}
4949
response *Ydb_Discovery.ListEndpointsResponse
5050
result Ydb_Discovery.ListEndpointsResult
51-
location string
5251
)
53-
defer func() {
54-
nodes := make([]trace.EndpointInfo, 0, len(endpoints))
55-
for _, e := range endpoints {
56-
nodes = append(nodes, e.Copy())
57-
}
58-
onDone(location, nodes, err)
59-
}()
60-
61-
ctx, err = c.config.Meta().Context(ctx)
62-
if err != nil {
63-
return nil, xerrors.WithStackTrace(err)
64-
}
6552

66-
response, err = c.client.ListEndpoints(ctx, &request)
53+
response, err = client.ListEndpoints(ctx, &request)
6754
if err != nil {
68-
return nil, xerrors.WithStackTrace(err)
55+
return nil, location, xerrors.WithStackTrace(err)
6956
}
7057

7158
if response.GetOperation().GetStatus() != Ydb.StatusIds_SUCCESS {
72-
return nil, xerrors.WithStackTrace(
59+
return nil, location, xerrors.WithStackTrace(
7360
xerrors.FromOperation(response.GetOperation()),
7461
)
7562
}
7663

7764
err = response.GetOperation().GetResult().UnmarshalTo(&result)
7865
if err != nil {
79-
return nil, xerrors.WithStackTrace(err)
66+
return nil, location, xerrors.WithStackTrace(err)
8067
}
8168

8269
location = result.GetSelfLocation()
8370
endpoints = make([]endpoint.Endpoint, 0, len(result.GetEndpoints()))
8471
for _, e := range result.GetEndpoints() {
85-
if e.GetSsl() == c.config.Secure() {
72+
if e.GetSsl() == config.Secure() {
8673
endpoints = append(endpoints, endpoint.New(
87-
net.JoinHostPort(e.GetAddress(), strconv.Itoa(int(e.GetPort()))),
74+
net.JoinHostPort(
75+
config.MutateAddress(e.GetAddress()),
76+
strconv.Itoa(int(e.GetPort())),
77+
),
8878
endpoint.WithLocation(e.GetLocation()),
8979
endpoint.WithID(e.GetNodeId()),
9080
endpoint.WithLoadFactor(e.GetLoadFactor()),
9181
endpoint.WithLocalDC(e.GetLocation() == location),
9282
endpoint.WithServices(e.GetService()),
83+
endpoint.WithLastUpdated(config.Clock().Now()),
9384
))
9485
}
9586
}
9687

88+
return endpoints, result.GetSelfLocation(), nil
89+
}
90+
91+
// Discover cluster endpoints
92+
func (c *Client) Discover(ctx context.Context) (endpoints []endpoint.Endpoint, finalErr error) {
93+
var (
94+
onDone = trace.DiscoveryOnDiscover(
95+
c.config.Trace(), &ctx,
96+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/discovery.(*Client).Discover"),
97+
c.config.Endpoint(), c.config.Database(),
98+
)
99+
location string
100+
)
101+
defer func() {
102+
nodes := make([]trace.EndpointInfo, 0, len(endpoints))
103+
for _, e := range endpoints {
104+
nodes = append(nodes, e.Copy())
105+
}
106+
onDone(location, nodes, finalErr)
107+
}()
108+
109+
ctx, err := c.config.Meta().Context(ctx)
110+
if err != nil {
111+
return nil, xerrors.WithStackTrace(err)
112+
}
113+
114+
endpoints, location, err = discover(ctx, c.client, c.config)
115+
if err != nil {
116+
return nil, xerrors.WithStackTrace(err)
117+
}
118+
97119
return endpoints, nil
98120
}
99121

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package discovery
2+
3+
import (
4+
"testing"
5+
6+
"github.com/jonboulle/clockwork"
7+
"github.com/stretchr/testify/require"
8+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
9+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Discovery"
10+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
11+
"go.uber.org/mock/gomock"
12+
grpcCodes "google.golang.org/grpc/codes"
13+
"google.golang.org/grpc/status"
14+
"google.golang.org/protobuf/types/known/anypb"
15+
16+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery/config"
17+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
18+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
19+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
20+
)
21+
22+
func must[T any](t T, err error) T {
23+
if err != nil {
24+
panic(err)
25+
}
26+
27+
return t
28+
}
29+
30+
func TestDiscover(t *testing.T) {
31+
t.Run("HappyWay", func(t *testing.T) {
32+
ctx := xtest.Context(t)
33+
ctrl := gomock.NewController(t)
34+
clock := clockwork.NewFakeClock()
35+
client := NewMockDiscoveryServiceClient(ctrl)
36+
client.EXPECT().ListEndpoints(gomock.Any(), &Ydb_Discovery.ListEndpointsRequest{
37+
Database: "test",
38+
}).Return(&Ydb_Discovery.ListEndpointsResponse{
39+
Operation: &Ydb_Operations.Operation{
40+
Ready: true,
41+
Status: Ydb.StatusIds_SUCCESS,
42+
Result: must(anypb.New(&Ydb_Discovery.ListEndpointsResult{
43+
Endpoints: []*Ydb_Discovery.EndpointInfo{
44+
{
45+
Address: "node1",
46+
Port: 1,
47+
Ssl: true,
48+
},
49+
{
50+
Address: "node2",
51+
Port: 2,
52+
Location: "AZ0",
53+
Ssl: true,
54+
},
55+
{
56+
Address: "node3",
57+
Port: 3,
58+
Ssl: false,
59+
},
60+
{
61+
Address: "node4",
62+
Port: 4,
63+
Location: "AZ0",
64+
Ssl: false,
65+
},
66+
},
67+
SelfLocation: "AZ0",
68+
})),
69+
},
70+
}, nil)
71+
endpoints, location, err := discover(ctx, client, config.New(
72+
config.WithDatabase("test"),
73+
config.WithSecure(false),
74+
config.WithClock(clock),
75+
))
76+
require.NoError(t, err)
77+
require.EqualValues(t, "AZ0", location)
78+
require.EqualValues(t, []endpoint.Endpoint{
79+
endpoint.New("node3:3",
80+
endpoint.WithLocalDC(false),
81+
endpoint.WithLastUpdated(clock.Now()),
82+
),
83+
endpoint.New("node4:4",
84+
endpoint.WithLocalDC(true),
85+
endpoint.WithLocation("AZ0"),
86+
endpoint.WithLastUpdated(clock.Now()),
87+
),
88+
}, endpoints)
89+
})
90+
t.Run("TransportError", func(t *testing.T) {
91+
ctx := xtest.Context(t)
92+
ctrl := gomock.NewController(t)
93+
client := NewMockDiscoveryServiceClient(ctrl)
94+
client.EXPECT().ListEndpoints(gomock.Any(), &Ydb_Discovery.ListEndpointsRequest{
95+
Database: "test",
96+
}).Return(nil, xerrors.Transport(status.Error(grpcCodes.Unavailable, "")))
97+
endpoints, location, err := discover(ctx, client, config.New(
98+
config.WithDatabase("test"),
99+
))
100+
require.Error(t, err)
101+
require.Empty(t, endpoints)
102+
require.Equal(t, "", location)
103+
require.True(t, xerrors.IsTransportError(err, grpcCodes.Unavailable))
104+
})
105+
t.Run("OperationError", func(t *testing.T) {
106+
ctx := xtest.Context(t)
107+
ctrl := gomock.NewController(t)
108+
client := NewMockDiscoveryServiceClient(ctrl)
109+
client.EXPECT().ListEndpoints(gomock.Any(), &Ydb_Discovery.ListEndpointsRequest{
110+
Database: "test",
111+
}).Return(&Ydb_Discovery.ListEndpointsResponse{
112+
Operation: &Ydb_Operations.Operation{
113+
Ready: true,
114+
Status: Ydb.StatusIds_UNAVAILABLE,
115+
},
116+
}, nil)
117+
endpoints, location, err := discover(ctx, client, config.New(
118+
config.WithDatabase("test"),
119+
))
120+
require.Error(t, err)
121+
require.Empty(t, endpoints)
122+
require.Equal(t, "", location)
123+
require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_UNAVAILABLE))
124+
})
125+
t.Run("WithAddressMutator", func(t *testing.T) {
126+
ctx := xtest.Context(t)
127+
ctrl := gomock.NewController(t)
128+
clock := clockwork.NewFakeClock()
129+
client := NewMockDiscoveryServiceClient(ctrl)
130+
client.EXPECT().ListEndpoints(gomock.Any(), &Ydb_Discovery.ListEndpointsRequest{
131+
Database: "test",
132+
}).Return(&Ydb_Discovery.ListEndpointsResponse{
133+
Operation: &Ydb_Operations.Operation{
134+
Ready: true,
135+
Status: Ydb.StatusIds_SUCCESS,
136+
Result: must(anypb.New(&Ydb_Discovery.ListEndpointsResult{
137+
Endpoints: []*Ydb_Discovery.EndpointInfo{
138+
{
139+
Address: "node1",
140+
Port: 1,
141+
},
142+
{
143+
Address: "node2",
144+
Port: 2,
145+
Location: "AZ0",
146+
},
147+
},
148+
SelfLocation: "AZ0",
149+
})),
150+
},
151+
}, nil)
152+
endpoints, location, err := discover(ctx, client, config.New(
153+
config.WithDatabase("test"),
154+
config.WithAddressMutator(func(address string) string {
155+
return "u-" + address
156+
}),
157+
config.WithClock(clock),
158+
))
159+
require.NoError(t, err)
160+
require.EqualValues(t, "AZ0", location)
161+
require.EqualValues(t, []endpoint.Endpoint{
162+
endpoint.New("u-node1:1",
163+
endpoint.WithLocalDC(false),
164+
endpoint.WithLastUpdated(clock.Now()),
165+
),
166+
endpoint.New("u-node2:2",
167+
endpoint.WithLocalDC(true),
168+
endpoint.WithLocation("AZ0"),
169+
endpoint.WithLastUpdated(clock.Now()),
170+
),
171+
}, endpoints)
172+
})
173+
}

0 commit comments

Comments
 (0)