Skip to content

Commit 2341772

Browse files
authored
Merge pull request #552 from ydb-platform/issue551
* Added `trace.Driver.{OnBalancerDialEntrypoint,OnBalancerClusterDiscoveryAttempt}` trace events
2 parents dfc7f63 + bca9630 commit 2341772

File tree

7 files changed

+256
-57
lines changed

7 files changed

+256
-57
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
* Added check of port in connection string and erro throw
44
* Fixed bug with initialization of connection pool before apply static credentials
55
* Refactored of applying grpc dial options with defaults
6+
* Added `trace.Driver.{OnBalancerDialEntrypoint,OnBalancerClusterDiscoveryAttempt}` trace events
67

78
## v3.42.8
89
* Fixed `internal/scheme/helpers/IsDirectoryExists(..)` recursive bug

connection.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -457,20 +457,7 @@ func connect(ctx context.Context, c *connection) error {
457457
c.pool = conn.NewPool(c.config)
458458
}
459459

460-
c.balancer, err = balancer.New(ctx,
461-
c.config, c.pool,
462-
append(
463-
// prepend common params from root config
464-
[]discoveryConfig.Option{
465-
discoveryConfig.With(c.config.Common),
466-
discoveryConfig.WithEndpoint(c.Endpoint()),
467-
discoveryConfig.WithDatabase(c.Name()),
468-
discoveryConfig.WithSecure(c.Secure()),
469-
discoveryConfig.WithMeta(c.config.Meta()),
470-
},
471-
c.discoveryOptions...,
472-
)...,
473-
)
460+
c.balancer, err = balancer.New(ctx, c.config, c.pool, c.discoveryOptions...)
474461
if err != nil {
475462
return xerrors.WithStackTrace(err)
476463
}

internal/balancer/balancer.go

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,19 @@ type Balancer struct {
3232
driverConfig config.Config
3333
balancerConfig balancerConfig.Config
3434
pool *conn.Pool
35-
discoveryClient func(ctx context.Context) (discoveryClient, error)
35+
discoveryClient func(ctx context.Context, address string) (discoveryClient, error)
3636
discoveryRepeater repeater.Repeater
3737
localDCDetector func(ctx context.Context, endpoints []endpoint.Endpoint) (string, error)
3838

3939
mu xsync.RWMutex
4040
connectionsState *connectionsState
4141

42-
onDiscovery []func(ctx context.Context, endpoints []endpoint.Info)
42+
onApplyDiscoveredEndpoints []func(ctx context.Context, endpoints []endpoint.Info)
4343
}
4444

45-
func (b *Balancer) OnUpdate(onDiscovery func(ctx context.Context, endpoints []endpoint.Info)) {
45+
func (b *Balancer) OnUpdate(onApplyDiscoveredEndpoints func(ctx context.Context, endpoints []endpoint.Info)) {
4646
b.mu.WithLock(func() {
47-
b.onDiscovery = append(b.onDiscovery, onDiscovery)
47+
b.onApplyDiscoveredEndpoints = append(b.onApplyDiscoveredEndpoints, onApplyDiscoveredEndpoints)
4848
})
4949
}
5050

@@ -66,22 +66,18 @@ func (b *Balancer) clusterDiscovery(ctx context.Context) (err error) {
6666

6767
func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
6868
var (
69-
onDone = trace.DriverOnBalancerUpdate(
69+
address = "ydb:///" + b.driverConfig.Endpoint()
70+
onDone = trace.DriverOnBalancerClusterDiscoveryAttempt(
7071
b.driverConfig.Trace(),
7172
&ctx,
72-
b.balancerConfig.DetectlocalDC,
73+
address,
7374
)
7475
endpoints []endpoint.Endpoint
7576
localDC string
7677
cancel context.CancelFunc
7778
)
78-
7979
defer func() {
80-
nodes := make([]trace.EndpointInfo, 0, len(endpoints))
81-
for _, e := range endpoints {
82-
nodes = append(nodes, e.Copy())
83-
}
84-
onDone(nodes, localDC, err)
80+
onDone(err)
8581
}()
8682

8783
if dialTimeout := b.driverConfig.DialTimeout(); dialTimeout > 0 {
@@ -91,7 +87,7 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
9187
}
9288
defer cancel()
9389

94-
client, err := b.discoveryClient(ctx)
90+
client, err := b.discoveryClient(ctx, address)
9591
if err != nil {
9692
return xerrors.WithStackTrace(err)
9793
}
@@ -117,6 +113,19 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
117113
}
118114

119115
func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []endpoint.Endpoint, localDC string) {
116+
onDone := trace.DriverOnBalancerUpdate(
117+
b.driverConfig.Trace(),
118+
&ctx,
119+
b.balancerConfig.DetectlocalDC,
120+
)
121+
defer func() {
122+
nodes := make([]trace.EndpointInfo, 0, len(endpoints))
123+
for _, e := range endpoints {
124+
nodes = append(nodes, e.Copy())
125+
}
126+
onDone(nodes, localDC, nil)
127+
}()
128+
120129
connections := endpointsToConnections(b.pool, endpoints)
121130
for _, c := range connections {
122131
b.pool.Allow(ctx, c)
@@ -133,8 +142,8 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end
133142

134143
b.mu.WithLock(func() {
135144
b.connectionsState = state
136-
for _, onDiscovery := range b.onDiscovery {
137-
onDiscovery(ctx, endpointsInfo)
145+
for _, onApplyDiscoveredEndpoints := range b.onApplyDiscoveredEndpoints {
146+
onApplyDiscoveredEndpoints(ctx, endpointsInfo)
138147
}
139148
})
140149
}
@@ -166,7 +175,13 @@ func New(
166175
driverConfig.Trace(),
167176
&ctx,
168177
)
169-
discoveryConfig = discoveryConfig.New(opts...)
178+
discoveryConfig = discoveryConfig.New(append(opts,
179+
discoveryConfig.With(driverConfig.Common),
180+
discoveryConfig.WithEndpoint(driverConfig.Endpoint()),
181+
discoveryConfig.WithDatabase(driverConfig.Database()),
182+
discoveryConfig.WithSecure(driverConfig.Secure()),
183+
discoveryConfig.WithMeta(driverConfig.Meta()),
184+
)...)
170185
)
171186
defer func() {
172187
onDone(err)
@@ -176,11 +191,16 @@ func New(
176191
driverConfig: driverConfig,
177192
pool: pool,
178193
localDCDetector: detectLocalDC,
179-
discoveryClient: func(ctx context.Context) (_ discoveryClient, err error) {
180-
cc, err := grpc.DialContext(ctx,
181-
"dns:///"+b.driverConfig.Endpoint(),
182-
b.driverConfig.GrpcDialOptions()...,
194+
discoveryClient: func(ctx context.Context, address string) (_ discoveryClient, err error) {
195+
onBalancerDialEntrypointDone := trace.DriverOnBalancerDialEntrypoint(
196+
b.driverConfig.Trace(),
197+
&ctx,
198+
address,
183199
)
200+
defer func() {
201+
onBalancerDialEntrypointDone(err)
202+
}()
203+
cc, err := grpc.DialContext(ctx, address, b.driverConfig.GrpcDialOptions()...)
184204
if err != nil {
185205
return nil, xerrors.WithStackTrace(err)
186206
}
@@ -195,11 +215,9 @@ func New(
195215
}
196216

197217
if b.balancerConfig.SingleConn {
198-
b.connectionsState = newConnectionsState(
199-
endpointsToConnections(pool, []endpoint.Endpoint{
200-
endpoint.New(driverConfig.Endpoint()),
201-
}),
202-
nil, balancerConfig.Info{}, false)
218+
b.applyDiscoveredEndpoints(ctx, []endpoint.Endpoint{
219+
endpoint.New(driverConfig.Endpoint()),
220+
}, "")
203221
} else {
204222
// initialization of balancer state
205223
if err = b.clusterDiscovery(ctx); err != nil {

internal/balancer/local_dc_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func TestLocalDCDiscovery(t *testing.T) {
142142
driverConfig: cfg,
143143
balancerConfig: *cfg.Balancer(),
144144
pool: conn.NewPool(cfg),
145-
discoveryClient: func(context.Context) (discoveryClient, error) {
145+
discoveryClient: func(context.Context, string) (discoveryClient, error) {
146146
return discoveryMock{endpoints: []endpoint.Endpoint{
147147
&mock.Endpoint{AddrField: "a:123", LocationField: "a"},
148148
&mock.Endpoint{AddrField: "b:234", LocationField: "b"},

log/driver.go

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -378,33 +378,67 @@ func Driver(l Logger, details trace.Details) (t trace.Driver) {
378378
}
379379
}
380380
}
381-
t.OnBalancerUpdate = func(
382-
info trace.DriverBalancerUpdateStartInfo,
381+
t.OnBalancerDialEntrypoint = func(
382+
info trace.DriverBalancerDialEntrypointStartInfo,
383383
) func(
384-
trace.DriverBalancerUpdateDoneInfo,
384+
trace.DriverBalancerDialEntrypointDoneInfo,
385385
) {
386-
l.Tracef(
387-
`balancer discovery start {needLocalDC: "%v"}`,
388-
info.NeedLocalDC,
389-
)
386+
l.Tracef(`trying to dial entrypoint {address:"%s"}`, info.Address)
390387
start := time.Now()
391-
return func(info trace.DriverBalancerUpdateDoneInfo) {
388+
return func(info trace.DriverBalancerDialEntrypointDoneInfo) {
392389
if info.Error == nil {
393-
l.Infof(
394-
`balancer discovery done {latency:"%v", endpoints: "%v", detectedLocalDC: "%v"}`,
390+
l.Tracef(`dial entrypoint done {latency:"%v"}`,
395391
time.Since(start),
396-
info.Endpoints,
397-
info.LocalDC,
398392
)
399393
} else {
400-
l.Errorf(
401-
`balancer discovery failed {latency:"%v", error: "%v"}`,
394+
l.Errorf(`dial entrypoint failed {latency:"%v",error:"%s",version:"%s"}`,
402395
time.Since(start),
403396
info.Error,
397+
meta.Version,
404398
)
405399
}
406400
}
407401
}
402+
t.OnBalancerClusterDiscoveryAttempt = func(
403+
info trace.DriverBalancerClusterDiscoveryAttemptStartInfo,
404+
) func(
405+
trace.DriverBalancerClusterDiscoveryAttemptDoneInfo,
406+
) {
407+
l.Tracef(`trying to cluster discovery {address:"%s"}`, info.Address)
408+
start := time.Now()
409+
return func(info trace.DriverBalancerClusterDiscoveryAttemptDoneInfo) {
410+
if info.Error == nil {
411+
l.Tracef(`cluster discovery done {latency:"%v"}`,
412+
time.Since(start),
413+
)
414+
} else {
415+
l.Errorf(`cluster discovery failed {latency:"%v",error:"%s",version:"%s"}`,
416+
time.Since(start),
417+
info.Error,
418+
meta.Version,
419+
)
420+
}
421+
}
422+
}
423+
t.OnBalancerUpdate = func(
424+
info trace.DriverBalancerUpdateStartInfo,
425+
) func(
426+
trace.DriverBalancerUpdateDoneInfo,
427+
) {
428+
l.Tracef(
429+
`balancer update start {needLocalDC: "%v"}`,
430+
info.NeedLocalDC,
431+
)
432+
start := time.Now()
433+
return func(info trace.DriverBalancerUpdateDoneInfo) {
434+
l.Infof(
435+
`balancer update done {latency:"%v", endpoints: "%v", detectedLocalDC: "%v"}`,
436+
time.Since(start),
437+
info.Endpoints,
438+
info.LocalDC,
439+
)
440+
}
441+
}
408442
}
409443
if details&trace.DriverCredentialsEvents != 0 {
410444
//nolint:govet

trace/driver.go

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,23 @@ type (
5353

5454
// Balancer events
5555
OnBalancerInit func(DriverBalancerInitStartInfo) func(DriverBalancerInitDoneInfo)
56+
OnBalancerDialEntrypoint func(
57+
DriverBalancerDialEntrypointStartInfo,
58+
) func(
59+
DriverBalancerDialEntrypointDoneInfo,
60+
)
5661
OnBalancerClose func(DriverBalancerCloseStartInfo) func(DriverBalancerCloseDoneInfo)
57-
OnBalancerChooseEndpoint func(DriverBalancerChooseEndpointStartInfo) func(DriverBalancerChooseEndpointDoneInfo)
58-
OnBalancerUpdate func(DriverBalancerUpdateStartInfo) func(DriverBalancerUpdateDoneInfo)
62+
OnBalancerChooseEndpoint func(
63+
DriverBalancerChooseEndpointStartInfo,
64+
) func(
65+
DriverBalancerChooseEndpointDoneInfo,
66+
)
67+
OnBalancerClusterDiscoveryAttempt func(
68+
DriverBalancerClusterDiscoveryAttemptStartInfo,
69+
) func(
70+
DriverBalancerClusterDiscoveryAttemptDoneInfo,
71+
)
72+
OnBalancerUpdate func(DriverBalancerUpdateStartInfo) func(DriverBalancerUpdateDoneInfo)
5973

6074
// Credentials events
6175
OnGetCredentials func(DriverGetCredentialsStartInfo) func(DriverGetCredentialsDoneInfo)
@@ -137,7 +151,19 @@ type (
137151
DriverBalancerUpdateDoneInfo struct {
138152
Endpoints []EndpointInfo
139153
LocalDC string
140-
Error error
154+
// Deprecated: this field always nil
155+
Error error
156+
}
157+
DriverBalancerClusterDiscoveryAttemptStartInfo struct {
158+
// Context make available context in trace callback function.
159+
// Pointer to context provide replacement of context in trace callback function.
160+
// Warning: concurrent access to pointer on client side must be excluded.
161+
// Safe replacement of context are provided only inside callback function
162+
Context *context.Context
163+
Address string
164+
}
165+
DriverBalancerClusterDiscoveryAttemptDoneInfo struct {
166+
Error error
141167
}
142168
DriverNetReadStartInfo struct {
143169
Address string
@@ -273,6 +299,17 @@ type (
273299
DriverBalancerInitDoneInfo struct {
274300
Error error
275301
}
302+
DriverBalancerDialEntrypointStartInfo struct {
303+
// Context make available context in trace callback function.
304+
// Pointer to context provide replacement of context in trace callback function.
305+
// Warning: concurrent access to pointer on client side must be excluded.
306+
// Safe replacement of context are provided only inside callback function
307+
Context *context.Context
308+
Address string
309+
}
310+
DriverBalancerDialEntrypointDoneInfo struct {
311+
Error error
312+
}
276313
DriverBalancerCloseStartInfo struct {
277314
// Context make available context in trace callback function.
278315
// Pointer to context provide replacement of context in trace callback function.

0 commit comments

Comments
 (0)