Skip to content

Commit 55762c7

Browse files
authored
Merge pull request #1630 from ydb-platform/KIKIMR-22430
no endpoints retriable error now
2 parents 94a33dc + f5cfcd2 commit 55762c7

File tree

6 files changed

+84
-35
lines changed

6 files changed

+84
-35
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* "No endpoints" is retriable error now
2+
13
## v3.99.3
24
* Fixed potential infinity loop for local dc detection (CWE-835)
35
* Fixed nil pointer dereferenced in a topic listener (CWE-476)

internal/balancer/balancer.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"google.golang.org/grpc"
1111

1212
"github.com/ydb-platform/ydb-go-sdk/v3/config"
13+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/backoff"
1314
balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
1415
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
1516
"github.com/ydb-platform/ydb-go-sdk/v3/internal/credentials"
@@ -28,7 +29,7 @@ import (
2829
)
2930

3031
var (
31-
ErrNoEndpoints = xerrors.Wrap(fmt.Errorf("no endpoints"))
32+
ErrNoEndpoints = xerrors.Wrap(xerrors.Retryable(fmt.Errorf("no endpoints"), xerrors.WithBackoff(backoff.TypeSlow)))
3233
errBalancerClosed = xerrors.Wrap(fmt.Errorf("internal ydb sdk balancer closed"))
3334
)
3435

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -424,14 +424,24 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) {
424424
}
425425
}
426426

427-
writer, err := w.startWriteStream(ctx, streamCtx, attempt)
427+
onWriterStarted := trace.TopicOnWriterReconnect(
428+
w.cfg.Tracer,
429+
w.writerInstanceID,
430+
w.cfg.topic,
431+
w.cfg.producerID,
432+
attempt,
433+
)
434+
435+
writer, err := w.startWriteStream(ctx, streamCtx)
428436
w.onWriterChange(writer)
437+
onStreamError := onWriterStarted(err)
429438
if err == nil {
430439
reconnectReason = writer.WaitClose(ctx)
431440
startOfRetries = time.Now()
432441
} else {
433442
reconnectReason = err
434443
}
444+
onStreamError(reconnectReason)
435445
}
436446
}
437447

@@ -467,21 +477,10 @@ func (w *WriterReconnector) handleReconnectRetry(
467477
return false
468478
}
469479

470-
func (w *WriterReconnector) startWriteStream(ctx, streamCtx context.Context, attempt int) (
480+
func (w *WriterReconnector) startWriteStream(ctx, streamCtx context.Context) (
471481
writer *SingleStreamWriter,
472482
err error,
473483
) {
474-
traceOnDone := trace.TopicOnWriterReconnect(
475-
w.cfg.Tracer,
476-
w.writerInstanceID,
477-
w.cfg.topic,
478-
w.cfg.producerID,
479-
attempt,
480-
)
481-
defer func() {
482-
traceOnDone(err)
483-
}()
484-
485484
stream, err := w.connectWithTimeout(streamCtx)
486485
if err != nil {
487486
return nil, err

log/topic.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -640,7 +640,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
640640
///
641641
t.OnWriterReconnect = func(
642642
info trace.TopicWriterReconnectStartInfo,
643-
) func(doneInfo trace.TopicWriterReconnectDoneInfo) {
643+
) func(doneInfo trace.TopicWriterReconnectConnectedInfo) func(reconnectDoneInfo trace.TopicWriterReconnectDoneInfo) {
644644
if d.Details()&trace.TopicWriterStreamLifeCycleEvents == 0 {
645645
return nil
646646
}
@@ -653,8 +653,9 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
653653
kv.Int("attempt", info.Attempt),
654654
)
655655

656-
return func(doneInfo trace.TopicWriterReconnectDoneInfo) {
657-
if doneInfo.Error == nil {
656+
return func(doneInfo trace.TopicWriterReconnectConnectedInfo) func(reconnectDoneInfo trace.TopicWriterReconnectDoneInfo) { //nolint:lll
657+
connectedTime := time.Now()
658+
if doneInfo.ConnectionResult == nil {
658659
l.Log(WithLevel(ctx, DEBUG), "connect to topic writer stream completed",
659660
kv.String("topic", info.Topic),
660661
kv.String("producer_id", info.ProducerID),
@@ -664,14 +665,23 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
664665
)
665666
} else {
666667
l.Log(WithLevel(ctx, WARN), "connect to topic writer stream completed",
667-
kv.Error(doneInfo.Error),
668+
kv.Error(doneInfo.ConnectionResult),
668669
kv.String("topic", info.Topic),
669670
kv.String("producer_id", info.ProducerID),
670671
kv.String("writer_instance_id", info.WriterInstanceID),
671672
kv.Int("attempt", info.Attempt),
672673
kv.Latency(start),
673674
)
674675
}
676+
677+
return func(reconnectDoneInfo trace.TopicWriterReconnectDoneInfo) {
678+
l.Log(WithLevel(ctx, INFO), "stop topic writer stream reason",
679+
kv.String("topic", info.Topic),
680+
kv.String("producer_id", info.ProducerID),
681+
kv.String("writer_instance_id", info.WriterInstanceID),
682+
kv.Duration("write with topic writer stream duration", time.Since(connectedTime)),
683+
kv.NamedError("reason", reconnectDoneInfo.Error))
684+
}
675685
}
676686
}
677687
t.OnWriterInitStream = func(

trace/topic.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ type (
108108
// TopicWriterStreamLifeCycleEvents
109109

110110
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
111-
OnWriterReconnect func(TopicWriterReconnectStartInfo) func(TopicWriterReconnectDoneInfo)
111+
OnWriterReconnect func(TopicWriterReconnectStartInfo) func(TopicWriterReconnectConnectedInfo) func(TopicWriterReconnectDoneInfo) //nolint:lll
112112
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
113113
OnWriterInitStream func(TopicWriterInitStreamStartInfo) func(TopicWriterInitStreamDoneInfo)
114114
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
@@ -424,6 +424,10 @@ type (
424424
Attempt int
425425
}
426426

427+
TopicWriterReconnectConnectedInfo struct {
428+
ConnectionResult error
429+
}
430+
427431
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
428432
TopicWriterReconnectDoneInfo struct {
429433
Error error

trace/topic_gtrace.go

Lines changed: 49 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)