-
Notifications
You must be signed in to change notification settings - Fork 28
Add: RetryPolicy #495
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add: RetryPolicy #495
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| namespace Ydb.Sdk.Retry; | ||
|
|
||
| public interface IRetryPolicy | ||
| { | ||
| RetryDecision Decide(in RetryContext ctx); | ||
|
|
||
| void ReportResult(in RetryContext ctx, bool success); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| namespace Ydb.Sdk.Retry; | ||
|
|
||
| public enum OperationKind | ||
| { | ||
| Read, | ||
| Write, | ||
| Schema, | ||
| Stream, | ||
| TopicProduce, | ||
| TopicConsume, | ||
| Discovery | ||
| } | ||
|
|
||
| public enum BackoffTier | ||
| { | ||
| None, | ||
| Instant, | ||
| Fast, | ||
| Slow | ||
| } | ||
|
|
||
| public readonly record struct Failure( | ||
| Exception Exception, | ||
| int? YdbStatusCode = null, | ||
| int? GrpcStatusCode = null | ||
| ); | ||
|
|
||
| public readonly record struct RetryContext( | ||
| int Attempt, | ||
| TimeSpan Elapsed, | ||
| TimeSpan? DeadlineLeft, | ||
| bool IsIdempotent, | ||
| OperationKind Operation, | ||
| Failure? LastFailure | ||
| ); | ||
|
|
||
| public readonly record struct RetryDecision( | ||
| TimeSpan? Delay, | ||
| bool RecreateSession = false, | ||
| bool ResetTransport = false, | ||
| bool Hedge = false | ||
| ); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,96 @@ | ||
| using System.Diagnostics; | ||
| using Grpc.Core; | ||
|
|
||
| namespace Ydb.Sdk.Retry; | ||
|
|
||
| public static class RetryExecutor | ||
| { | ||
| public static async Task<T> RunAsync<T>( | ||
| Func<CancellationToken, Task<T>> op, | ||
| IRetryPolicy policy, | ||
| bool isIdempotent, | ||
| OperationKind operationKind, | ||
| TimeSpan? overallTimeout = null, | ||
| Func<Task>? recreateSession = null, | ||
| Func<Exception, Failure?>? classify = null, | ||
| CancellationToken ct = default) | ||
| { | ||
| var sw = Stopwatch.StartNew(); | ||
| using var tmo = overallTimeout is null ? null : new CancellationTokenSource(overallTimeout.Value); | ||
| using var linked = tmo is null | ||
| ? CancellationTokenSource.CreateLinkedTokenSource(ct) | ||
| : CancellationTokenSource.CreateLinkedTokenSource(ct, tmo.Token); | ||
|
|
||
| int attempt = 0; | ||
| Exception? last = null; | ||
|
|
||
| while (true) | ||
| { | ||
| attempt++; | ||
| try | ||
| { | ||
| var res = await op(linked.Token).ConfigureAwait(false); | ||
| policy.ReportResult(new RetryContext( | ||
| attempt, | ||
| sw.Elapsed, | ||
| GetTimeLeft(overallTimeout, sw), | ||
| isIdempotent, | ||
| operationKind, | ||
| last is not null ? ToFailure(last, classify) : null), | ||
| true); | ||
| return res; | ||
| } | ||
| catch (Exception ex) when (!linked.Token.IsCancellationRequested) | ||
| { | ||
| last = ex; | ||
| var ctx = new RetryContext( | ||
| attempt, | ||
| sw.Elapsed, | ||
| GetTimeLeft(overallTimeout, sw), | ||
| isIdempotent, | ||
| operationKind, | ||
| ToFailure(ex, classify) | ||
| ); | ||
| var decision = policy.Decide(ctx); | ||
|
|
||
| if (decision.Delay is null) | ||
| throw; | ||
|
|
||
| if (decision.RecreateSession && recreateSession is not null) | ||
| await recreateSession().ConfigureAwait(false); | ||
|
|
||
| if (decision.Delay.Value > TimeSpan.Zero) | ||
| await Task.Delay(decision.Delay.Value, linked.Token).ConfigureAwait(false); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private static TimeSpan? GetTimeLeft(TimeSpan? overall, Stopwatch sw) => | ||
| overall is null ? null : overall.Value - sw.Elapsed; | ||
|
|
||
| private static Failure? ToFailure(Exception ex, Func<Exception, Failure?>? custom) | ||
| { | ||
| if (custom is not null) | ||
| return custom(ex); | ||
|
|
||
| if (ex is RpcException rx) | ||
| return new Failure(rx, null, (int)rx.StatusCode); | ||
|
|
||
| if (TryGetYdbStatusCode(ex, out var code)) | ||
| return new Failure(ex, code); | ||
|
|
||
| return new Failure(ex); | ||
| } | ||
|
|
||
| private static bool TryGetYdbStatusCode(Exception ex, out int ydbStatusCode) | ||
| { | ||
| if (ex.GetType().Name.Contains("Ydb") && ex.Data.Contains("StatusCode") && ex.Data["StatusCode"] is int code) | ||
| { | ||
| ydbStatusCode = code; | ||
| return true; | ||
| } | ||
|
|
||
| ydbStatusCode = default; | ||
| return false; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| namespace Ydb.Sdk.Retry; | ||
|
|
||
| public static class RetryPrimitives | ||
| { | ||
| public static IRetryPolicy NoRetry { get; } = new NoRetryPolicy(); | ||
|
|
||
| public static IRetryPolicy FixedDelay(TimeSpan delay, int maxAttempts = 3) => | ||
| new FixedDelayPolicy(delay, maxAttempts); | ||
|
|
||
| private sealed class NoRetryPolicy : IRetryPolicy | ||
| { | ||
| public RetryDecision Decide(in RetryContext ctx) => new(null); | ||
| public void ReportResult(in RetryContext ctx, bool success) { } | ||
| } | ||
|
|
||
| private sealed class FixedDelayPolicy : IRetryPolicy | ||
| { | ||
| private readonly TimeSpan _delay; | ||
| private readonly int _maxAttempts; | ||
|
|
||
| public FixedDelayPolicy(TimeSpan delay, int maxAttempts) | ||
| { | ||
| _delay = delay; | ||
| _maxAttempts = maxAttempts; | ||
| } | ||
|
|
||
| public RetryDecision Decide(in RetryContext ctx) | ||
| { | ||
| return ctx.Attempt < _maxAttempts | ||
| ? new RetryDecision(_delay) | ||
| : new RetryDecision(null); | ||
| } | ||
|
|
||
| public void ReportResult(in RetryContext ctx, bool success) { } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| using Grpc.Core; | ||
| using Grpc.Core.Interceptors; | ||
|
|
||
| namespace Ydb.Sdk.Retry; | ||
|
|
||
| public sealed class RetryingInterceptor : Interceptor | ||
| { | ||
| private readonly IRetryPolicy _policy; | ||
| private readonly TimeSpan _defaultDeadline; | ||
|
|
||
| public RetryingInterceptor(IRetryPolicy policy, TimeSpan? defaultDeadline = null) | ||
| { | ||
| _policy = policy ?? throw new ArgumentNullException(nameof(policy)); | ||
| _defaultDeadline = defaultDeadline ?? TimeSpan.FromSeconds(30); | ||
| } | ||
|
|
||
| public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>( | ||
| TRequest request, | ||
| ClientInterceptorContext<TRequest, TResponse> context, | ||
| AsyncUnaryCallContinuation<TRequest, TResponse> continuation) | ||
| { | ||
| bool isRead = GuessIdempotency(context.Method); | ||
| var kind = Classify(context.Method); | ||
|
|
||
| var deadlineLeft = context.Options.Deadline is { } dl | ||
| ? dl - DateTime.UtcNow | ||
| : _defaultDeadline; | ||
|
|
||
| async Task<TResponse> Do(CancellationToken ct) | ||
| { | ||
| var call = continuation(request, context); | ||
| await using var registration = ct.Register(() => | ||
| { | ||
| try { call.Dispose(); } | ||
| catch | ||
|
Check warning on line 35 in src/Ydb.Sdk/src/Retry/RetryingInterceptor.cs
|
||
| {} | ||
| }); | ||
| return await call.ResponseAsync.ConfigureAwait(false); | ||
| } | ||
|
|
||
| var task = RetryExecutor.RunAsync( | ||
| op: Do, | ||
| policy: _policy, | ||
| isIdempotent: isRead, | ||
| operationKind: kind, | ||
| overallTimeout: deadlineLeft | ||
| ); | ||
|
|
||
| return new AsyncUnaryCall<TResponse>(task, null, null, null, null); | ||
|
Check warning on line 49 in src/Ydb.Sdk/src/Retry/RetryingInterceptor.cs
|
||
| } | ||
|
|
||
| private static bool GuessIdempotency(IMethod method) => | ||
| method.Type == MethodType.Unary && | ||
| (method.FullName.Contains("Read", StringComparison.OrdinalIgnoreCase) | ||
| || method.FullName.Contains("Describe", StringComparison.OrdinalIgnoreCase) | ||
| || method.FullName.Contains("Get", StringComparison.OrdinalIgnoreCase)); | ||
|
|
||
| private static OperationKind Classify(IMethod method) | ||
| { | ||
| var n = method.FullName; | ||
| if (n.Contains("Topic", StringComparison.OrdinalIgnoreCase)) return OperationKind.Stream; | ||
| if (n.Contains("Scheme", StringComparison.OrdinalIgnoreCase)) return OperationKind.Schema; | ||
| if (n.Contains("Discovery", StringComparison.OrdinalIgnoreCase)) return OperationKind.Discovery; | ||
| return OperationKind.Read; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| namespace Ydb.Sdk.Retry; | ||
|
|
||
| public sealed class YdbRetryPolicy : IRetryPolicy | ||
| { | ||
| private static readonly TimeSpan FastStart = TimeSpan.FromMilliseconds(10); | ||
| private static readonly TimeSpan FastCap = TimeSpan.FromSeconds(2); | ||
| private static readonly TimeSpan SlowStart = TimeSpan.FromSeconds(1); | ||
| private static readonly TimeSpan SlowCap = TimeSpan.FromSeconds(30); | ||
| private const double Mult = 2.0; | ||
|
|
||
| public RetryDecision Decide(in RetryContext ctx) | ||
| { | ||
| if (ctx.DeadlineLeft is { } left && left <= TimeSpan.Zero) | ||
| return new(null); | ||
|
|
||
| var (tier, recreate, reset, hedge) = Classify(ctx); | ||
| if (tier == BackoffTier.None) | ||
| return new(null); | ||
|
|
||
| var delay = tier switch | ||
| { | ||
| BackoffTier.Instant => TimeSpan.Zero, | ||
| BackoffTier.Fast => Jitter(ctx.Attempt, FastStart, FastCap), | ||
| BackoffTier.Slow => Jitter(ctx.Attempt, SlowStart, SlowCap), | ||
| _ => TimeSpan.Zero | ||
| }; | ||
|
|
||
| // чтобы не выходить за край дедлайна (с небольшим запасом) | ||
| if (ctx.DeadlineLeft is { } dl && delay > dl - TimeSpan.FromMilliseconds(50)) | ||
| return new(null); | ||
|
|
||
| return new(delay, recreate, reset, hedge && ctx.IsIdempotent && ctx.Attempt >= 2); | ||
| } | ||
|
|
||
| public void ReportResult(in RetryContext ctx, bool success) | ||
| { | ||
| // Пока ничего — можно добавить метрики/логику адаптации в будущем. | ||
| } | ||
|
|
||
| private (BackoffTier tier, bool recreate, bool reset, bool hedge) Classify(in RetryContext ctx) | ||
| { | ||
| var ydb = ctx.LastFailure?.YdbStatusCode; | ||
| var grpc = ctx.LastFailure?.GrpcStatusCode; | ||
|
|
||
| // Всё вынесено в константы для читабельности и повторного использования | ||
| const int badSession = 400100; | ||
| const int sessionExpired = 400150; | ||
| const int sessionBusy = 400190; | ||
| const int unavailable = 400050; | ||
| const int aborted = 400040; | ||
| const int overloaded = 400060; | ||
| const int timeout = 400090; | ||
| const int undetermined = 400170; | ||
|
|
||
| switch (ydb) | ||
| { | ||
| case badSession or sessionExpired: | ||
| return (BackoffTier.Instant, recreate: true, reset: false, hedge: false); | ||
| case sessionBusy: | ||
| return (BackoffTier.Fast, recreate: true, reset: false, hedge: true); | ||
| case unavailable or aborted: | ||
| return (BackoffTier.Fast, recreate: false, reset: false, hedge: true); | ||
| case overloaded: | ||
| return (BackoffTier.Slow, recreate: false, reset: false, hedge: false); | ||
| case timeout or undetermined: | ||
| return ctx.IsIdempotent ? (BackoffTier.Fast, false, false, true) | ||
| : (BackoffTier.None, false, false, false); | ||
| } | ||
|
|
||
| switch (grpc) | ||
| { | ||
| case (int)Grpc.Core.StatusCode.Unavailable or (int)Grpc.Core.StatusCode.DeadlineExceeded: | ||
| return (BackoffTier.Fast, recreate: false, reset: true, hedge: true); | ||
| case (int)Grpc.Core.StatusCode.ResourceExhausted: | ||
| return (BackoffTier.Slow, recreate: false, reset: false, hedge: false); | ||
| case (int)Grpc.Core.StatusCode.Aborted: | ||
| return (BackoffTier.Instant, recreate: false, reset: false, hedge: true); | ||
| default: | ||
| return (BackoffTier.None, false, false, false); | ||
| } | ||
| } | ||
|
|
||
| private static TimeSpan Jitter(int attempt, TimeSpan start, TimeSpan cap) | ||
| { | ||
| var max = Math.Min(cap.TotalMilliseconds, Math.Pow(Mult, attempt - 1) * start.TotalMilliseconds); | ||
| if (max < start.TotalMilliseconds) max = start.TotalMilliseconds; | ||
| var next = Random.Shared.NextDouble() * (max - start.TotalMilliseconds) + start.TotalMilliseconds; | ||
| return TimeSpan.FromMilliseconds(next); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
я думаю это точно не public опции