Skip to content

Commit bfefc5d

Browse files
authored
Merge pull request #639 from ydb-platform/fix-retryable
* Fixed default state of `internal/xerrors.retryableError`: it inheri…
2 parents 629b22a + bda2e07 commit bfefc5d

File tree

3 files changed

+34
-19
lines changed

3 files changed

+34
-19
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
* Fixed default state of `internal/xerrors.retryableError`: it inherit properties from parent error as possible
2+
* Marked event `grpc/stats.End` as ignored at observing status of grpc connection
3+
14
## v3.42.12
25
* Replaced the balancer connection to discovery service from short-lived grpc connection to `internal/conn` lazy connection (revert related changes from `v3.42.6`)
36
* Marked as deprecated `trace.Driver.OnBalancerDialEntrypoint` event callback

internal/conn/conn.go

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -329,21 +329,20 @@ func (c *conn) Invoke(
329329

330330
err = cc.Invoke(ctx, method, req, res, append(opts, grpc.Trailer(&md))...)
331331
if err != nil {
332+
defer func() {
333+
c.onTransportError(ctx, err)
334+
}()
335+
332336
if useWrapping {
333337
err = xerrors.Transport(err,
334338
xerrors.WithAddress(c.Address()),
335339
)
336340
if sentMark.canRetry() {
337-
err = xerrors.Retryable(err,
338-
xerrors.WithName("Invoke"),
339-
xerrors.WithDeleteSession(),
340-
)
341+
return xerrors.WithStackTrace(xerrors.Retryable(err))
341342
}
342-
err = xerrors.WithStackTrace(err)
343+
return xerrors.WithStackTrace(err)
343344
}
344345

345-
c.onTransportError(ctx, err)
346-
347346
return err
348347
}
349348

@@ -417,18 +416,20 @@ func (c *conn) NewStream(
417416

418417
s, err = cc.NewStream(ctx, desc, method, opts...)
419418
if err != nil {
419+
defer func() {
420+
c.onTransportError(ctx, err)
421+
}()
422+
420423
if useWrapping {
421-
err = xerrors.Retryable(
422-
xerrors.Transport(err,
423-
xerrors.WithAddress(c.Address()),
424-
),
425-
xerrors.WithName("NewStream"),
426-
xerrors.WithDeleteSession(),
424+
err = xerrors.Transport(err,
425+
xerrors.WithAddress(c.Address()),
427426
)
427+
if sentMark.canRetry() {
428+
return s, xerrors.WithStackTrace(xerrors.Retryable(err))
429+
}
430+
return s, xerrors.WithStackTrace(err)
428431
}
429432

430-
c.onTransportError(ctx, err)
431-
432433
return s, err
433434
}
434435

@@ -495,6 +496,8 @@ func (statsHandler) HandleRPC(ctx context.Context, rpcStats stats.RPCStats) {
495496
switch rpcStats.(type) {
496497
case *stats.Begin:
497498
getContextMark(ctx).markSafeToRetry()
499+
case *stats.End:
500+
// if data was sent - markDirty() was called early then stats.End received
498501
default:
499502
getContextMark(ctx).markDirty()
500503
}
@@ -511,7 +514,9 @@ type ctxHandleRPCKey struct{}
511514
var rpcKey = ctxHandleRPCKey{}
512515

513516
func markContext(ctx context.Context) (context.Context, *modificationMark) {
514-
mark := &modificationMark{}
517+
mark := &modificationMark{
518+
safeToRetry: 1,
519+
}
515520
return context.WithValue(ctx, rpcKey, mark), mark
516521
}
517522

internal/xerrors/retryable.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,16 @@ func WithDeleteSession() RetryableErrorOption {
6262
}
6363

6464
func Retryable(err error, opts ...RetryableErrorOption) error {
65-
re := &retryableError{
66-
err: err,
67-
name: "CUSTOM",
65+
var (
66+
e Error
67+
re = &retryableError{
68+
err: err,
69+
name: "CUSTOM",
70+
}
71+
)
72+
if As(err, &e) {
73+
re.backoffType = e.BackoffType()
74+
re.mustDeleteSession = e.MustDeleteSession()
6875
}
6976
for _, o := range opts {
7077
if o != nil {

0 commit comments

Comments
 (0)