Skip to content

Commit e3ec5cb

Browse files
committed
* Added trace.Driver.OnRouter{Init,Close,SelectCandidate,Discovery} events
* Marked cluster events as deprecated
1 parent 1a4515a commit e3ec5cb

File tree

8 files changed

+483
-221
lines changed

8 files changed

+483
-221
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
* Implemented detection of local data-center with measuring tcp dial RTT
2-
* Added `trace.Driver.OnRouterDiscovery` event
2+
* Added `trace.Driver.OnBalancer{Init,Close,ChooseEndpoint,Update}` events
3+
* Marked the driver cluster events as deprecated
34
* Simplified the balancing logic
45

56
## v3.25.3

connection.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,17 @@ func open(ctx context.Context, opts ...Option) (_ Connection, err error) {
390390
return nil, xerrors.WithStackTrace(errors.New("configuration: empty database"))
391391
}
392392

393+
onDone := trace.DriverOnInit(
394+
c.config.Trace(),
395+
&ctx,
396+
c.config.Endpoint(),
397+
c.config.Database(),
398+
c.config.Secure(),
399+
)
400+
defer func() {
401+
onDone(err)
402+
}()
403+
393404
if c.pool == nil {
394405
c.pool = conn.NewPool(
395406
ctx,

internal/router/router.go

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type router struct {
3535

3636
func (r *router) clusterDiscovery(ctx context.Context) (err error) {
3737
var (
38-
onDone = trace.DriverOnRouterDiscovery(
38+
onDone = trace.DriverOnBalancerUpdate(
3939
r.driverConfig.Trace(),
4040
&ctx,
4141
r.routerConfig.DetectlocalDC,
@@ -93,6 +93,14 @@ func (r *router) Discovery() discovery.Client {
9393
}
9494

9595
func (r *router) Close(ctx context.Context) (err error) {
96+
onDone := trace.DriverOnBalancerClose(
97+
r.driverConfig.Trace(),
98+
&ctx,
99+
)
100+
defer func() {
101+
onDone(err)
102+
}()
103+
96104
issues := make([]error, 0, 2)
97105

98106
if r.discoveryRepeater != nil {
@@ -116,12 +124,9 @@ func New(
116124
pool *conn.Pool,
117125
opts ...discoveryConfig.Option,
118126
) (_ Connection, err error) {
119-
onDone := trace.DriverOnInit(
127+
onDone := trace.DriverOnBalancerInit(
120128
c.Trace(),
121129
&ctx,
122-
c.Endpoint(),
123-
c.Database(),
124-
c.Secure(),
125130
)
126131
defer func() {
127132
onDone(err)
@@ -258,13 +263,31 @@ func (r *router) connections() *connectionsState {
258263
return r.connectionsState
259264
}
260265

261-
func (r *router) getConn(ctx context.Context) (conn.Conn, error) {
262-
state := r.connections()
263-
c, failedCount := state.GetConnection(ctx)
264-
if failedCount*2 > state.PreferredCount() {
265-
r.discoveryRepeater.Force()
266-
}
266+
func (r *router) getConn(ctx context.Context) (c conn.Conn, err error) {
267+
onDone := trace.DriverOnBalancerChooseEndpoint(
268+
r.driverConfig.Trace(),
269+
&ctx,
270+
)
271+
defer func() {
272+
if err == nil {
273+
onDone(c.Endpoint(), nil)
274+
} else {
275+
onDone(nil, err)
276+
}
277+
}()
278+
279+
var (
280+
state = r.connections()
281+
failedCount int
282+
)
283+
284+
defer func() {
285+
if failedCount*2 > state.PreferredCount() {
286+
r.discoveryRepeater.Force()
287+
}
288+
}()
267289

290+
c, failedCount = state.GetConnection(ctx)
268291
if c == nil {
269292
return nil, xerrors.WithStackTrace(ErrClusterEmpty)
270293
}

log/driver.go

Lines changed: 42 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -418,47 +418,24 @@ func Driver(l Logger, details trace.Details) (t trace.Driver) {
418418
}
419419
}
420420
}
421-
422-
t.OnRouterDiscovery = func(info trace.DriverRouterDiscoveryStartInfo) func(trace.DriverRouterDiscoveryDoneInfo) {
423-
l.Tracef(
424-
`router discovery start {needLocalDC: "%v"}`,
425-
info.NeedLocalDC,
426-
)
427-
start := time.Now()
428-
return func(info trace.DriverRouterDiscoveryDoneInfo) {
429-
if info.Error == nil {
430-
l.Infof(
431-
`router discovery done {latency:"%v", endpoints: "%v", detectedLocalDC: "%v"}`,
432-
time.Since(start),
433-
info.Endpoints,
434-
info.LocalDC,
435-
)
436-
} else {
437-
l.Errorf(
438-
`router discovery failed {latency:"%v", error: "%v"}`,
439-
time.Since(start),
440-
info.Error,
441-
)
442-
}
443-
}
444-
}
445421
}
446-
if details&trace.DriverClusterEvents != 0 {
422+
// nolint:nestif
423+
if details&trace.DriverBalancerEvents != 0 {
447424
// nolint:govet
448425
l := l.WithName(`cluster`)
449-
t.OnClusterInit = func(info trace.DriverClusterInitStartInfo) func(trace.DriverClusterInitDoneInfo) {
426+
t.OnBalancerInit = func(info trace.DriverBalancerInitStartInfo) func(trace.DriverBalancerInitDoneInfo) {
450427
l.Tracef(`init start`)
451428
start := time.Now()
452-
return func(info trace.DriverClusterInitDoneInfo) {
429+
return func(info trace.DriverBalancerInitDoneInfo) {
453430
l.Debugf(`init done {latency:"%v"}`,
454431
time.Since(start),
455432
)
456433
}
457434
}
458-
t.OnClusterClose = func(info trace.DriverClusterCloseStartInfo) func(trace.DriverClusterCloseDoneInfo) {
435+
t.OnBalancerClose = func(info trace.DriverBalancerCloseStartInfo) func(trace.DriverBalancerCloseDoneInfo) {
459436
l.Tracef(`close start`)
460437
start := time.Now()
461-
return func(info trace.DriverClusterCloseDoneInfo) {
438+
return func(info trace.DriverBalancerCloseDoneInfo) {
462439
if info.Error == nil {
463440
l.Tracef(`close done {latency:"%v"}`,
464441
time.Since(start),
@@ -472,24 +449,55 @@ func Driver(l Logger, details trace.Details) (t trace.Driver) {
472449
}
473450
}
474451
}
475-
t.OnClusterGet = func(info trace.DriverClusterGetStartInfo) func(trace.DriverClusterGetDoneInfo) {
476-
l.Tracef(`get start`)
452+
t.OnBalancerChooseEndpoint = func(
453+
info trace.DriverBalancerChooseEndpointStartInfo,
454+
) func(
455+
trace.DriverBalancerChooseEndpointDoneInfo,
456+
) {
457+
l.Tracef(`select endpoint start`)
477458
start := time.Now()
478-
return func(info trace.DriverClusterGetDoneInfo) {
459+
return func(info trace.DriverBalancerChooseEndpointDoneInfo) {
479460
if info.Error == nil {
480-
l.Tracef(`get done {latency:"%v",endpoint:%v}`,
461+
l.Tracef(`select endpoint done {latency:"%v",endpoint:%v}`,
481462
time.Since(start),
482463
info.Endpoint.String(),
483464
)
484465
} else {
485-
l.Warnf(`get failed {latency:"%v",error:"%s",version:"%s"}`,
466+
l.Warnf(`select endpoint failed {latency:"%v",error:"%s",version:"%s"}`,
486467
time.Since(start),
487468
info.Error,
488469
meta.Version,
489470
)
490471
}
491472
}
492473
}
474+
t.OnBalancerUpdate = func(
475+
info trace.DriverBalancerUpdateStartInfo,
476+
) func(
477+
trace.DriverBalancerUpdateDoneInfo,
478+
) {
479+
l.Tracef(
480+
`router discovery start {needLocalDC: "%v"}`,
481+
info.NeedLocalDC,
482+
)
483+
start := time.Now()
484+
return func(info trace.DriverBalancerUpdateDoneInfo) {
485+
if info.Error == nil {
486+
l.Infof(
487+
`router discovery done {latency:"%v", endpoints: "%v", detectedLocalDC: "%v"}`,
488+
time.Since(start),
489+
info.Endpoints,
490+
info.LocalDC,
491+
)
492+
} else {
493+
l.Errorf(
494+
`router discovery failed {latency:"%v", error: "%v"}`,
495+
time.Since(start),
496+
info.Error,
497+
)
498+
}
499+
}
500+
}
493501
}
494502
if details&trace.DriverCredentialsEvents != 0 {
495503
// nolint:govet

trace/details.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ type Details uint64
99
const (
1010
DriverNetEvents Details = 1 << iota // 1
1111
DriverConnEvents // 2
12-
DriverClusterEvents // 4
12+
DriverBalancerEvents // 4
1313
DriverResolverEvents // 8
1414
DriverRepeaterEvents // 16
1515
DriverCredentialsEvents // 32
@@ -34,9 +34,12 @@ const (
3434

3535
CoordinationEvents // 262144
3636

37+
// Deprecated: has no effect now
38+
DriverClusterEvents
39+
3740
DriverEvents = DriverNetEvents |
3841
DriverConnEvents |
39-
DriverClusterEvents |
42+
DriverBalancerEvents |
4043
DriverResolverEvents |
4144
DriverRepeaterEvents |
4245
DriverCredentialsEvents // 63
@@ -65,9 +68,9 @@ const (
6568
var (
6669
details = map[Details]string{
6770
DriverEvents: "ydb.driver",
68-
DriverClusterEvents: "ydb.driver.cluster",
69-
DriverNetEvents: "ydb.driver.xnet",
70-
DriverResolverEvents: "ydb.driver.xresolver",
71+
DriverBalancerEvents: "ydb.driver.balancer",
72+
DriverNetEvents: "ydb.driver.net",
73+
DriverResolverEvents: "ydb.driver.resolver",
7174
DriverRepeaterEvents: "ydb.driver.repeater",
7275
DriverConnEvents: "ydb.driver.conn",
7376
DriverCredentialsEvents: "ydb.driver.credentials",

trace/details_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ func TestDetailsMatch(t *testing.T) {
1818
details: DriverEvents,
1919
},
2020
{
21-
pattern: `^ydb\.driver\.xresolver$`,
21+
pattern: `^ydb\.driver\.resolver$`,
2222
details: DriverResolverEvents,
2323
},
2424
{
25-
pattern: `^ydb\.driver\.(xresolver|xnet)$`,
25+
pattern: `^ydb\.driver\.(resolver|net)$`,
2626
details: DriverResolverEvents | DriverNetEvents,
2727
},
2828
{
29-
pattern: `^ydb\.driver\.(conn|credentials|xresolver|xnet)$`,
29+
pattern: `^ydb\.driver\.(conn|credentials|resolver|net)$`,
3030
details: DriverConnEvents | DriverCredentialsEvents | DriverResolverEvents | DriverNetEvents,
3131
},
3232
{

0 commit comments

Comments
 (0)