Skip to content

Commit 1aa7283

Browse files
authored
Merge pull request #1557 from ydb-platform/force-close-conn-on-dns-errs
gRPC connection will be forcefully closed on DNS resolver errors from…
2 parents 6e8ca8f + 0e194a6 commit 1aa7283

File tree

7 files changed

+116
-121
lines changed

7 files changed

+116
-121
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
* Refactored coordination traces
2+
* gRPC connection will be forcefully closed on DNS resolver errors from now on
23

34
## v3.91.0
45
* Added `ydb.WithPreferredNodeID(ctx, nodeID)` context modifier for trying to execute queries on given nodeID

config/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func (c *Config) ExcludeGRPCCodesForPessimization() []grpcCodes.Code {
4848
// GrpcDialOptions reports about used grpc dialing options
4949
func (c *Config) GrpcDialOptions() []grpc.DialOption {
5050
return append(
51-
defaultGrpcOptions(c.trace, c.secure, c.tlsConfig),
51+
defaultGrpcOptions(c.secure, c.tlsConfig),
5252
c.grpcOptions...,
5353
)
5454
}

config/defaults.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/ydb-platform/ydb-go-sdk/v3/balancers"
1414
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
1515
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
16-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xresolver"
1716
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
1817
)
1918

@@ -30,7 +29,7 @@ var (
3029
}
3130
)
3231

33-
func defaultGrpcOptions(t *trace.Driver, secure bool, tlsConfig *tls.Config) (opts []grpc.DialOption) {
32+
func defaultGrpcOptions(secure bool, tlsConfig *tls.Config) (opts []grpc.DialOption) {
3433
opts = append(opts,
3534
// keep-aliving all connections
3635
grpc.WithKeepaliveParams(
@@ -45,15 +44,6 @@ func defaultGrpcOptions(t *trace.Driver, secure bool, tlsConfig *tls.Config) (op
4544
grpc.MaxCallRecvMsgSize(DefaultGRPCMsgSize),
4645
grpc.MaxCallSendMsgSize(DefaultGRPCMsgSize),
4746
),
48-
// use proxy-resolvers
49-
// 1) for interpret schemas `ydb`, `grpc` and `grpcs` in node URLs as for dns resolver
50-
// 2) for observe resolving events
51-
grpc.WithResolvers(
52-
xresolver.New("", t),
53-
xresolver.New("ydb", t),
54-
xresolver.New("grpc", t),
55-
xresolver.New("grpcs", t),
56-
),
5747
)
5848
if secure {
5949
opts = append(opts, grpc.WithTransportCredentials(

driver_string_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func TestDriver_String(t *testing.T) {
2323
config.WithDatabase("local"),
2424
config.WithSecure(false),
2525
)},
26-
s: `Driver{Endpoint:"localhost",Database:"local",Secure:false,Credentials:Anonymous{From:"github.com/ydb-platform/ydb-go-sdk/v3/config.defaultConfig(defaults.go:90)"}}`, //nolint:lll
26+
s: `Driver{Endpoint:"localhost",Database:"local",Secure:false,Credentials:Anonymous{From:"github.com/ydb-platform/ydb-go-sdk/v3/config.defaultConfig(defaults.go:80)"}}`, //nolint:lll
2727
},
2828
{
2929
name: xtest.CurrentFileLine(),
@@ -32,7 +32,7 @@ func TestDriver_String(t *testing.T) {
3232
config.WithDatabase("local"),
3333
config.WithSecure(true),
3434
)},
35-
s: `Driver{Endpoint:"localhost",Database:"local",Secure:true,Credentials:Anonymous{From:"github.com/ydb-platform/ydb-go-sdk/v3/config.defaultConfig(defaults.go:90)"}}`, //nolint:lll
35+
s: `Driver{Endpoint:"localhost",Database:"local",Secure:true,Credentials:Anonymous{From:"github.com/ydb-platform/ydb-go-sdk/v3/config.defaultConfig(defaults.go:80)"}}`, //nolint:lll
3636
},
3737
{
3838
name: xtest.CurrentFileLine(),

internal/balancer/balancer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
2222
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
2323
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
24+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xresolver"
2425
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xslices"
2526
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
2627
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
@@ -186,6 +187,9 @@ func makeDiscoveryFunc(
186187
"ydb:///"+driverConfig.Endpoint(),
187188
append(
188189
driverConfig.GrpcDialOptions(),
190+
grpc.WithResolvers(
191+
xresolver.New("ydb", driverConfig.Trace()),
192+
),
189193
grpc.WithBlock(),
190194
grpc.WithDefaultServiceConfig(`{
191195
"loadBalancingPolicy": "pick_first"

internal/conn/conn.go

Lines changed: 44 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -48,19 +48,26 @@ type Conn interface {
4848
Unban(ctx context.Context) State
4949
}
5050

51-
type conn struct {
52-
mtx sync.RWMutex
53-
config Config // ro access
54-
grpcConn *grpc.ClientConn
55-
done chan struct{}
56-
endpoint endpoint.Endpoint // ro access
57-
closed bool
58-
state atomic.Uint32
59-
childStreams *xcontext.CancelsGuard
60-
lastUsage xsync.LastUsage
61-
onClose []func(*conn)
62-
onTransportErrors []func(ctx context.Context, cc Conn, cause error)
63-
}
51+
type (
52+
connConfig interface {
53+
Trace() *trace.Driver
54+
DialTimeout() time.Duration
55+
GrpcDialOptions() []grpc.DialOption
56+
}
57+
conn struct {
58+
mtx sync.RWMutex
59+
config connConfig // ro access
60+
grpcConn *grpc.ClientConn
61+
done chan struct{}
62+
endpoint endpoint.Endpoint // ro access
63+
closed bool
64+
state atomic.Uint32
65+
childStreams *xcontext.CancelsGuard
66+
lastUsage xsync.LastUsage
67+
onClose []func(*conn)
68+
onTransportErrors []func(ctx context.Context, cc Conn, cause error)
69+
}
70+
)
6471

6572
func (c *conn) Address() string {
6673
return c.endpoint.Address()
@@ -177,18 +184,6 @@ func (c *conn) GetState() (s State) {
177184
return State(c.state.Load())
178185
}
179186

180-
func makeDialOption(overrideHost string) []grpc.DialOption {
181-
dialOption := []grpc.DialOption{
182-
grpc.WithStatsHandler(statsHandler{}),
183-
}
184-
185-
if len(overrideHost) != 0 {
186-
dialOption = append(dialOption, grpc.WithAuthority(overrideHost))
187-
}
188-
189-
return dialOption
190-
}
191-
192187
func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
193188
if c.isClosed() {
194189
return nil, xerrors.WithStackTrace(errClosedConnection)
@@ -201,31 +196,37 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
201196
return c.grpcConn, nil
202197
}
203198

204-
if dialTimeout := c.config.DialTimeout(); dialTimeout > 0 {
205-
var cancel context.CancelFunc
206-
ctx, cancel = xcontext.WithTimeout(ctx, dialTimeout)
207-
defer cancel()
208-
}
199+
return c.dial(ctx)
200+
}
209201

202+
// c.mtx must be locked
203+
func (c *conn) dial(ctx context.Context) (cc *grpc.ClientConn, err error) {
210204
onDone := trace.DriverOnConnDial(
211205
c.config.Trace(), &ctx,
212-
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/conn.(*conn).realConn"),
206+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/conn.(*conn).dial"),
213207
c.endpoint.Copy(),
214208
)
215209
defer func() {
216210
onDone(err)
217211
}()
218212

219-
// prepend "ydb" scheme for grpc dns-resolver to find the proper scheme
220-
// three slashes in "ydb:///" is ok. It needs for good parse scheme in grpc resolver.
221-
address := "ydb:///" + c.endpoint.Address()
213+
if dialTimeout := c.config.DialTimeout(); dialTimeout > 0 {
214+
var cancel context.CancelFunc
215+
ctx, cancel = xcontext.WithTimeout(ctx, dialTimeout)
216+
defer cancel()
217+
}
218+
219+
address := c.endpoint.Address()
222220

223-
dialOption := makeDialOption(c.endpoint.OverrideHost())
221+
dialOpts := c.config.GrpcDialOptions()
224222

225-
cc, err = grpc.DialContext(ctx, address, append( //nolint:staticcheck,nolintlint
226-
dialOption,
227-
c.config.GrpcDialOptions()...,
228-
)...)
223+
dialOpts = append(dialOpts, grpc.WithStatsHandler(statsHandler{}))
224+
225+
if overrideHost := c.endpoint.OverrideHost(); overrideHost != "" {
226+
dialOpts = append(dialOpts, grpc.WithAuthority(overrideHost))
227+
}
228+
229+
cc, err = grpc.DialContext(ctx, address, dialOpts...)
229230
if err != nil {
230231
if xerrors.IsContextError(err) {
231232
return nil, xerrors.WithStackTrace(err)
@@ -238,7 +239,9 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
238239
return nil, xerrors.WithStackTrace(
239240
xerrors.Retryable(
240241
xerrors.Transport(err),
241-
xerrors.WithName("realConn"),
242+
xerrors.WithName(
243+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/conn.(*conn).dial").FunctionID(),
244+
),
242245
),
243246
)
244247
}
@@ -581,7 +584,7 @@ func withOnTransportError(onTransportError func(ctx context.Context, cc Conn, ca
581584
}
582585
}
583586

584-
func newConn(e endpoint.Endpoint, config Config, opts ...option) *conn {
587+
func newConn(e endpoint.Endpoint, config connConfig, opts ...option) *conn {
585588
c := &conn{
586589
endpoint: e,
587590
config: config,
@@ -604,10 +607,6 @@ func newConn(e endpoint.Endpoint, config Config, opts ...option) *conn {
604607
return c
605608
}
606609

607-
func New(e endpoint.Endpoint, config Config, opts ...option) Conn {
608-
return newConn(e, config, opts...)
609-
}
610-
611610
var _ stats.Handler = statsHandler{}
612611

613612
type statsHandler struct{}

0 commit comments

Comments
 (0)