diff --git a/src/Ydb.Sdk/src/Ado/YdbCommand.cs b/src/Ydb.Sdk/src/Ado/YdbCommand.cs index 2025991e..65d21004 100644 --- a/src/Ydb.Sdk/src/Ado/YdbCommand.cs +++ b/src/Ydb.Sdk/src/Ado/YdbCommand.cs @@ -241,4 +241,9 @@ await YdbConnection.Session public new async Task ExecuteReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken) => (YdbDataReader)await ExecuteDbDataReaderAsync(behavior, cancellationToken); + + internal Task InternalExecuteDbDataReaderAsync( + CommandBehavior behavior, + CancellationToken cancellationToken) => + ExecuteDbDataReaderAsync(behavior, cancellationToken); } diff --git a/src/Ydb.Sdk/src/Ado/YdbConnection.cs b/src/Ydb.Sdk/src/Ado/YdbConnection.cs index a240e4bc..47eea4b6 100644 --- a/src/Ydb.Sdk/src/Ado/YdbConnection.cs +++ b/src/Ydb.Sdk/src/Ado/YdbConnection.cs @@ -238,6 +238,7 @@ protected override void Dispose(bool disposing) return; if (disposing) Close(); + GC.SuppressFinalize(this); _disposed = true; } @@ -250,6 +251,7 @@ public override async ValueTask DisposeAsync() return; await CloseAsync(); + GC.SuppressFinalize(this); _disposed = true; } diff --git a/src/Ydb.Sdk/src/Ado/YdbDataSource.cs b/src/Ydb.Sdk/src/Ado/YdbDataSource.cs index 2a701ead..09c221f5 100644 --- a/src/Ydb.Sdk/src/Ado/YdbDataSource.cs +++ b/src/Ydb.Sdk/src/Ado/YdbDataSource.cs @@ -1,5 +1,7 @@ #if NET7_0_OR_GREATER using System.Data.Common; +using Ydb.Sdk.Retry; +using Ydb.Sdk.Retry.Classifier; namespace Ydb.Sdk.Ado; @@ -7,21 +9,29 @@ public class YdbDataSource : DbDataSource { private readonly YdbConnectionStringBuilder _ydbConnectionStringBuilder; - public YdbDataSource(YdbConnectionStringBuilder connectionStringBuilder) - { - _ydbConnectionStringBuilder = connectionStringBuilder; - } + private static readonly YdbRetryPolicy DefaultRetryPolicy = new(); + private TimeSpan DefaultTimeout { get; init; } = TimeSpan.FromSeconds(30); + private int MaxRetryAttempts { get; init; } = YdbRetryPolicy.DefaultMaxAttempts; + + private YdbDataSource(YdbConnectionStringBuilder connectionStringBuilder) + : this() => _ydbConnectionStringBuilder = connectionStringBuilder; public YdbDataSource(string connectionString) - { - _ydbConnectionStringBuilder = new YdbConnectionStringBuilder(connectionString); - } + : this(new YdbConnectionStringBuilder(connectionString)) { } public YdbDataSource() { _ydbConnectionStringBuilder = new YdbConnectionStringBuilder(); } + public YdbDataSource(YdbConnectionStringBuilder csb, TimeSpan defaultTimeout, + int maxAttempts = YdbRetryPolicy.DefaultMaxAttempts) + : this(csb) + { + DefaultTimeout = defaultTimeout; + MaxRetryAttempts = maxAttempts; + } + protected override YdbConnection CreateDbConnection() => new(_ydbConnectionStringBuilder); protected override YdbConnection OpenDbConnection() @@ -34,7 +44,14 @@ protected override YdbConnection OpenDbConnection() } catch { - dbConnection.Close(); + try + { + dbConnection.Close(); + } + catch + { + // ignored + } throw; } } @@ -54,7 +71,14 @@ protected override YdbConnection OpenDbConnection() } catch { - await ydbConnection.CloseAsync(); + try + { + await ydbConnection.CloseAsync(); + } + catch + { + // ignored + } throw; } } @@ -64,7 +88,129 @@ protected override YdbConnection OpenDbConnection() protected override async ValueTask DisposeAsyncCore() => await PoolManager.ClearPool(_ydbConnectionStringBuilder.ConnectionString); - protected override void Dispose(bool disposing) => DisposeAsyncCore().AsTask().GetAwaiter().GetResult(); + protected override void Dispose(bool disposing) + { + try { DisposeAsyncCore().AsTask().GetAwaiter().GetResult(); } + catch + { + // ignored + } + } + + public async Task ExecuteWithRetryAsync( + Func> operation, + object? context = null, + TimeSpan? timeout = null, + int? maxAttempts = null, + CancellationToken cancellationToken = default) => + await ExecuteWithRetryAsync( + operation: operation, + context: context, + timeout: timeout, + classifier: null, + maxAttempts: maxAttempts ?? MaxRetryAttempts, + cancellationToken: cancellationToken); + + private async Task ExecuteWithRetryAsync( + Func> operation, + object? context = null, + TimeSpan? timeout = null, + IRetryClassifier? classifier = null, + int? maxAttempts = null, + CancellationToken cancellationToken = default) + { + var (kind, isIdempotent) = InferKindAndIdempotency(context); + + return await RetryExecutor.RunAsync( + op: operation, + policy: DefaultRetryPolicy, + isIdempotent: isIdempotent, + operationKind: kind, + overallTimeout: timeout ?? DefaultTimeout, + classifier: classifier, + maxAttempts: maxAttempts ?? MaxRetryAttempts, + ct: cancellationToken + ).ConfigureAwait(false); + } + + private static (OperationKind kind, bool isIdempotent) InferKindAndIdempotency(object? context) + { + switch (context) + { + case DbCommand cmd: + { + var kw = FirstKeyword(cmd.CommandText); + return kw switch + { + "SELECT" or "SHOW" or "DESCRIBE" => (OperationKind.Read, true), + "INSERT" or "UPDATE" or "DELETE" or "UPSERT" => (OperationKind.Write, false), + "CREATE" or "DROP" or "ALTER" => (OperationKind.Schema, ContainsIfExists(cmd.CommandText)), + _ => (OperationKind.Read, false) + }; + } + case string sqlText: + { + var kw = FirstKeyword(sqlText); + return kw switch + { + "SELECT" or "SHOW" or "DESCRIBE" => (OperationKind.Read, true), + "INSERT" or "UPDATE" or "DELETE" or "UPSERT" => (OperationKind.Write, false), + "CREATE" or "DROP" or "ALTER" => (OperationKind.Schema, ContainsIfExists(sqlText)), + _ => (OperationKind.Write, false) + }; + } + } + + return (OperationKind.Read, false); + } + + private static bool ContainsIfExists(string? sql) + { + if (string.IsNullOrEmpty(sql)) return false; + return sql.IndexOf("IF NOT EXISTS", StringComparison.OrdinalIgnoreCase) >= 0 + || sql.IndexOf("IF EXISTS", StringComparison.OrdinalIgnoreCase) >= 0; + } + + private static string FirstKeyword(string? sql) + { + if (string.IsNullOrWhiteSpace(sql)) return string.Empty; + var s = sql.AsSpan(); + + while (s.Length > 0 && char.IsWhiteSpace(s[0])) s = s[1..]; + + while (true) + { + if (s.StartsWith("--".AsSpan())) + { + var idx = s.IndexOfAny('\r', '\n'); + s = idx >= 0 ? s[(idx + 1)..] : ReadOnlySpan.Empty; + while (s.Length > 0 && char.IsWhiteSpace(s[0])) s = s[1..]; + continue; + } + if (s.StartsWith("/*".AsSpan())) + { + var end = s.IndexOf("*/".AsSpan()); + s = end >= 0 ? s[(end + 2)..] : ReadOnlySpan.Empty; + while (s.Length > 0 && char.IsWhiteSpace(s[0])) s = s[1..]; + continue; + } + break; + } + + while (s.Length > 0 && s[0] == '(') + { + s = s[1..]; + while (s.Length > 0 && char.IsWhiteSpace(s[0])) s = s[1..]; + } + + var i = 0; + while (i < s.Length && char.IsLetter(s[i])) i++; + if (i == 0) return string.Empty; + + var kw = s[..i].ToString().ToUpperInvariant(); + if (kw == "WITH") return "SELECT"; + return kw; + } } #endif diff --git a/src/Ydb.Sdk/src/Retry/Classifier/DefaultRetryClassifier.cs b/src/Ydb.Sdk/src/Retry/Classifier/DefaultRetryClassifier.cs new file mode 100644 index 00000000..3b81f725 --- /dev/null +++ b/src/Ydb.Sdk/src/Retry/Classifier/DefaultRetryClassifier.cs @@ -0,0 +1,155 @@ +using System.Collections.Concurrent; +using System.Diagnostics.CodeAnalysis; +using Grpc.Core; + +namespace Ydb.Sdk.Retry.Classifier; + +internal sealed class DefaultRetryClassifier : IRetryClassifier +{ + public static readonly DefaultRetryClassifier Instance = new(); + + private static readonly string[] MemberNames = { "StatusCode", "Status", "YdbStatusCode", "Code" }; + + private sealed class Accessors + { + public Func? ReadDirect { get; init; } + public Func? ReadResponse { get; init; } + } + + private static readonly ConcurrentDictionary Cache = new(); + + [UnconditionalSuppressMessage("Trimming", "IL2057")] + [UnconditionalSuppressMessage("AOT", "IL3050")] + public Failure? Classify(Exception ex) + { + if (ex is RpcException rx) + return new Failure(rx, null, (int)rx.StatusCode); + + var f = TryClassify(ex); + if (f is not null) return f; + + if (ex is AggregateException aggr) + { + foreach (var inner in aggr.Flatten().InnerExceptions) + { + if (inner is RpcException irx) + return new Failure(ex, null, (int)irx.StatusCode); + f = TryClassify(inner); + if (f is not null) return f; + } + } + + var cur = ex.InnerException; + while (cur is not null) + { + if (cur is RpcException rx2) + return new Failure(ex, null, (int)rx2.StatusCode); + f = TryClassify(cur); + if (f is not null) return f; + cur = cur.InnerException; + } + + return new Failure(ex); + } + + private static Accessors Build(global::System.Type t) + { + static int? ReadIntOrEnum(object? val, global::System.Type type) + { + if (val is null) return null; + if (val is int i) return i; + if (type.IsEnum) return Convert.ToInt32(val); + return null; + } + + Func readResp = obj => + { + try + { + return t.GetProperty("Result", + System.Reflection.BindingFlags.Instance | + System.Reflection.BindingFlags.Public | + System.Reflection.BindingFlags.NonPublic)?.GetValue(obj) + ?? t.GetProperty("Response", + System.Reflection.BindingFlags.Instance | + System.Reflection.BindingFlags.Public | + System.Reflection.BindingFlags.NonPublic)?.GetValue(obj); + } + catch { return null; } + }; + + return new Accessors { ReadDirect = Direct, ReadResponse = readResp }; + + int? Direct(object obj) + { + try + { + foreach (var name in MemberNames) + { + var p = t.GetProperty(name, System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.NonPublic); + if (p is not null) + { + object? v = null; + try + { + v = p.GetValue(obj); + } + catch + { + /* ignore */ + } + + var r = ReadIntOrEnum(v, p.PropertyType); + if (r.HasValue) return r; + } + + var f = t.GetField(name, System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.NonPublic); + if (f is not null) + { + object? v = null; + try + { + v = f.GetValue(obj); + } + catch + { + /* ignore */ + } + + var r = ReadIntOrEnum(v, f.FieldType); + if (r.HasValue) return r; + } + } + } + catch + { + /* ignore */ + } + + return null; + } + } + + private static Failure? TryClassify(Exception ex) + { + if (ex is RpcException rx) + return new Failure(rx, null, (int)rx.StatusCode); + + global::System.Type t = ex.GetType(); + var acc = Cache.GetOrAdd(t, tt => Build(tt)); + + var code = acc.ReadDirect?.Invoke(ex); + if (code.HasValue) return new Failure(ex, code, null); + + var resp = acc.ReadResponse?.Invoke(ex); + if (resp is not null) + { + global::System.Type rt = resp.GetType(); + var racc = Cache.GetOrAdd(rt, Build); + var code2 = racc.ReadDirect?.Invoke(resp); + if (code2.HasValue) return new Failure(ex, code2, null); + } + + return null; + } +} diff --git a/src/Ydb.Sdk/src/Retry/Classifier/IRetryClassifier.cs b/src/Ydb.Sdk/src/Retry/Classifier/IRetryClassifier.cs new file mode 100644 index 00000000..7efeab8f --- /dev/null +++ b/src/Ydb.Sdk/src/Retry/Classifier/IRetryClassifier.cs @@ -0,0 +1,6 @@ +namespace Ydb.Sdk.Retry.Classifier; + +internal interface IRetryClassifier +{ + Failure? Classify(Exception ex); +} diff --git a/src/Ydb.Sdk/src/Retry/IRetryPolicy.cs b/src/Ydb.Sdk/src/Retry/IRetryPolicy.cs new file mode 100644 index 00000000..c90c2831 --- /dev/null +++ b/src/Ydb.Sdk/src/Retry/IRetryPolicy.cs @@ -0,0 +1,8 @@ +namespace Ydb.Sdk.Retry; + +internal interface IRetryPolicy +{ + RetryDecision Decide(in RetryContext ctx); + + void ReportResult(in RetryContext ctx, bool success); +} diff --git a/src/Ydb.Sdk/src/Retry/InMemoryDbCommand.cs b/src/Ydb.Sdk/src/Retry/InMemoryDbCommand.cs new file mode 100644 index 00000000..2958ed98 --- /dev/null +++ b/src/Ydb.Sdk/src/Retry/InMemoryDbCommand.cs @@ -0,0 +1,118 @@ +using System.Data; +using System.Data.Common; +using System.Text; +using Ydb.Sdk.Ado; + +namespace Ydb.Sdk.Retry; + +internal sealed class InMemoryDbCommand : DbCommand +{ + private readonly YdbCommand _inner; + private DataTable? _buffer; + private static int MaxBufferedRows { get; set; } = 100000; + private static long MaxBufferedBytes { get; set; } = 64 * 1024 * 1024; + + public InMemoryDbCommand(YdbCommand inner) => _inner = inner; + + public override string CommandText + { + get => _inner.CommandText; + set => _inner.CommandText = value; + } + + public override int CommandTimeout + { + get => _inner.CommandTimeout; + set => _inner.CommandTimeout = value; + } + + public override CommandType CommandType + { + get => _inner.CommandType; + set => _inner.CommandType = value; + } + + public override UpdateRowSource UpdatedRowSource + { + get => _inner.UpdatedRowSource; + set => _inner.UpdatedRowSource = value; + } + + protected override DbConnection DbConnection + { + get => _inner.Connection; + set => _inner.Connection = (YdbConnection)value; + } + + protected override DbParameterCollection DbParameterCollection => _inner.Parameters; + + protected override DbTransaction? DbTransaction + { + get => _inner.Transaction; + set => _inner.Transaction = (YdbTransaction?)value; + } + + public override bool DesignTimeVisible + { + get => _inner.DesignTimeVisible; + set => _inner.DesignTimeVisible = value; + } + + protected override DbParameter CreateDbParameter() => _inner.CreateParameter(); + + public override void Prepare() => _inner.Prepare(); + public override void Cancel() => _inner.Cancel(); + + public override int ExecuteNonQuery() => throw new NotSupportedException(); + public override object ExecuteScalar() => throw new NotSupportedException(); + protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior) => throw new NotSupportedException(); + + protected override async Task ExecuteDbDataReaderAsync( + CommandBehavior behavior, + CancellationToken cancellationToken) + { + await EnsureBufferAsync(behavior, cancellationToken).ConfigureAwait(false); + return _buffer!.CreateDataReader(); + } + + private async Task EnsureBufferAsync(CommandBehavior behavior, CancellationToken ct) + { + if (_buffer is not null) return; + _buffer = new DataTable(); + await using var rdr = await _inner.InternalExecuteDbDataReaderAsync(behavior, ct) + .ConfigureAwait(false); + _buffer.Load(rdr, LoadOption.OverwriteChanges, null); + + var approx = EstimateSize(_buffer); + if (_buffer.Rows.Count > MaxBufferedRows || approx > MaxBufferedBytes) + throw new InvalidOperationException( + $"The result set is too large to retry in-memory " + + $"(rows={_buffer.Rows.Count}, bytes≈{approx:N0}). " + + $"Either lower fetch size, increase " + + $"`InMemoryDbCommand.MaxBufferedRows/MaxBufferedBytes`, " + + $"or disable retries for this command."); + } + + private static long EstimateSize(DataTable t) + { + long total = 0; + foreach (DataRow row in t.Rows) + foreach (var obj in row.ItemArray) + switch (obj) + { + case byte[] ba: + total += ba.Length; + break; + case string s: + total += Encoding.UTF8.GetByteCount(s); + break; + case DBNull: + break; + default: + total += 8; + break; + } + + return total; + } +} diff --git a/src/Ydb.Sdk/src/Retry/Models.cs b/src/Ydb.Sdk/src/Retry/Models.cs new file mode 100644 index 00000000..8126420d --- /dev/null +++ b/src/Ydb.Sdk/src/Retry/Models.cs @@ -0,0 +1,42 @@ +namespace Ydb.Sdk.Retry; + +internal enum OperationKind +{ + Read, + Write, + Schema, + Stream, + TopicProduce, + TopicConsume, + Discovery +} + +internal enum BackoffTier +{ + None, + Instant, + Fast, + Slow +} + +internal readonly record struct Failure( + Exception Exception, + int? YdbStatusCode = null, + int? GrpcStatusCode = null +); + +internal readonly record struct RetryContext( + int Attempt, + TimeSpan Elapsed, + TimeSpan? DeadlineLeft, + bool IsIdempotent, + OperationKind Operation, + Failure? LastFailure +); + +internal readonly record struct RetryDecision( + TimeSpan? Delay, + bool RecreateSession = false, + bool ResetTransport = false, + bool Hedge = false +); diff --git a/src/Ydb.Sdk/src/Retry/RetryExecutor.cs b/src/Ydb.Sdk/src/Retry/RetryExecutor.cs new file mode 100644 index 00000000..e78b058a --- /dev/null +++ b/src/Ydb.Sdk/src/Retry/RetryExecutor.cs @@ -0,0 +1,201 @@ +using System.Diagnostics; +using Ydb.Sdk.Retry.Classifier; + +namespace Ydb.Sdk.Retry; + +internal static class RetryExecutor +{ + public static async Task RunAsync( + Func> op, + IRetryPolicy policy, + bool isIdempotent, + OperationKind operationKind, + TimeSpan? overallTimeout = null, + Func? recreateSession = null, + Func? resetTransport = null, + IRetryClassifier? classifier = null, + int? maxAttempts = null, + CancellationToken ct = default) + { + classifier ??= DefaultRetryClassifier.Instance; + + var sw = Stopwatch.StartNew(); + using var tmo = overallTimeout is null ? null : new CancellationTokenSource(overallTimeout.Value); + using var linked = tmo is null + ? CancellationTokenSource.CreateLinkedTokenSource(ct) + : CancellationTokenSource.CreateLinkedTokenSource(ct, tmo.Token); + + var attempt = 0; + Exception? last = null; + + static Task StartAttempt(Func> op, CancellationToken token) + { + return op(token); + } + + while (true) + { + attempt++; + if (maxAttempts is { } ma && attempt > ma) + { + policy.ReportResult( + new RetryContext( + attempt, + sw.Elapsed, + GetTimeLeft(overallTimeout, sw), + isIdempotent, + operationKind, + last is not null ? classifier.Classify(last) : null), + success: false); + throw last ?? new TimeoutException("Retry attempts limit reached."); + } + try + { + var res = await op(linked.Token).ConfigureAwait(false); + + policy.ReportResult( + new RetryContext( + attempt, + sw.Elapsed, + GetTimeLeft(overallTimeout, sw), + isIdempotent, + operationKind, + last is not null ? classifier.Classify(last) : null), + true); + + return res; + } + catch (Exception ex) when (!linked.Token.IsCancellationRequested) + { + last = ex; + + var ctx = new RetryContext( + attempt, + sw.Elapsed, + GetTimeLeft(overallTimeout, sw), + isIdempotent, + operationKind, + classifier.Classify(ex) + ); + + var decision = policy.Decide(ctx); + + if (decision.Delay is null) + { + policy.ReportResult(ctx, false); + throw; + } + + if (decision.ResetTransport && resetTransport is not null) + await resetTransport().ConfigureAwait(false); + + if (decision.Hedge && ctx.IsIdempotent) + { + var remaining = GetTimeLeft(overallTimeout, sw); + if (remaining is { } rem && rem > TimeSpan.Zero) + { + using var winnerCts = CancellationTokenSource.CreateLinkedTokenSource(linked.Token); + + using var hedgeCts = CancellationTokenSource.CreateLinkedTokenSource(winnerCts.Token); + hedgeCts.CancelAfter(TimeSpan.FromMilliseconds(Math.Max(1, rem.TotalMilliseconds * 0.7))); + + var hedgedTask = StartAttempt(op, hedgeCts.Token); + + var primaryTask = Task.Run(async () => + { + if (decision.Delay is { } d && d > TimeSpan.Zero) + { + if (ctx.DeadlineLeft is { } left && d >= left - TimeSpan.FromMilliseconds(50)) + throw new TimeoutException("Retry delay exceeds remaining budget."); + + await Task.Delay(d, linked.Token).ConfigureAwait(false); + + if (linked.IsCancellationRequested) + throw new TimeoutException("Retry budget exceeded."); + } + return await StartAttempt(op, winnerCts.Token).ConfigureAwait(false); + }, winnerCts.Token); + + try + { + var first = await Task.WhenAny(hedgedTask, primaryTask).ConfigureAwait(false); + if (first.Status == TaskStatus.RanToCompletion) + { + var ok = await first.ConfigureAwait(false); + try { winnerCts.Cancel(); } catch { /* ignore */ } + policy.ReportResult( + new RetryContext( + attempt + 1, + sw.Elapsed, + GetTimeLeft(overallTimeout, sw), + isIdempotent, + operationKind, + classifier.Classify(last)), + true); + return ok; + } + + var second = ReferenceEquals(first, hedgedTask) ? primaryTask : hedgedTask; + try + { + var ok2 = await second.ConfigureAwait(false); + try { winnerCts.Cancel(); } catch { /* ignore */ } + policy.ReportResult( + new RetryContext( + attempt + 1, + sw.Elapsed, + GetTimeLeft(overallTimeout, sw), + isIdempotent, + operationKind, + classifier.Classify(last)), + true); + return ok2; + } + catch + { + // ignored + } + } + finally + { + try { winnerCts.Cancel(); } + catch + { + // ignored + } + + _ = Task.WhenAll( + hedgedTask.ContinueWith(_ => { }, TaskContinuationOptions.ExecuteSynchronously), + primaryTask.ContinueWith(_ => { }, TaskContinuationOptions.ExecuteSynchronously) + ); + } + continue; + } + } + + if (decision.RecreateSession && recreateSession is not null) + await recreateSession().ConfigureAwait(false); + + if (decision.Delay is { } delay && delay > TimeSpan.Zero) + await Task.Delay(delay, linked.Token).ConfigureAwait(false); + } + catch (OperationCanceledException oce) when (linked.Token.IsCancellationRequested) + { + policy.ReportResult( + new RetryContext( + attempt, + sw.Elapsed, + GetTimeLeft(overallTimeout, sw), + isIdempotent, + operationKind, + classifier.Classify(oce) + ), + success: false); + throw; + } + } + } + + private static TimeSpan? GetTimeLeft(TimeSpan? overall, Stopwatch sw) => + overall is null ? null : overall.Value - sw.Elapsed; +} diff --git a/src/Ydb.Sdk/src/Retry/RetryingInterceptor.cs b/src/Ydb.Sdk/src/Retry/RetryingInterceptor.cs new file mode 100644 index 00000000..7f7049a8 --- /dev/null +++ b/src/Ydb.Sdk/src/Retry/RetryingInterceptor.cs @@ -0,0 +1,111 @@ +using Grpc.Core; +using Grpc.Core.Interceptors; + +using GrpcStatus = Grpc.Core.Status; +using GrpcMetadata = Grpc.Core.Metadata; +using GrpcStatusCode = Grpc.Core.StatusCode; +using GrpcIMethod = Grpc.Core.IMethod; + +using Ydb.Sdk.Retry.Classifier; + +namespace Ydb.Sdk.Retry; + +internal sealed class RetryingInterceptor : Interceptor +{ + private readonly IRetryPolicy _policy; + private readonly TimeSpan _defaultDeadline; + private readonly IRetryClassifier _classifier; + private readonly int? _maxAttempts; + + public Func? ResetTransport { get; init; } + + public RetryingInterceptor( + IRetryPolicy policy, + TimeSpan? defaultDeadline = null, + IRetryClassifier? classifier = null, + int? maxAttempts = null) + { + _policy = policy ?? throw new ArgumentNullException(nameof(policy)); + _defaultDeadline = defaultDeadline ?? TimeSpan.FromSeconds(30); + _classifier = classifier ?? DefaultRetryClassifier.Instance; + _maxAttempts = maxAttempts ?? YdbRetryPolicy.DefaultMaxAttempts; + } + + public override AsyncUnaryCall AsyncUnaryCall( + TRequest request, + ClientInterceptorContext context, + AsyncUnaryCallContinuation continuation) + { + bool isRead = GuessIdempotency(context.Method); + var kind = Classify(context.Method); + + var budget = context.Options.Deadline is { } dl + ? dl - DateTime.UtcNow + : _defaultDeadline; + + var startedAt = DateTime.UtcNow; + + Func statusProvider = () => new GrpcStatus(GrpcStatusCode.OK, ""); + Func trailersProvider = () => new GrpcMetadata(); + Action disposeProvider = () => { }; + + Task headersTask = Task.FromResult(new GrpcMetadata()); + + async Task Do(CancellationToken ct) + { + var elapsed = DateTime.UtcNow - startedAt; + var remaining = budget - elapsed; + if (remaining <= TimeSpan.FromMilliseconds(50)) + throw new RpcException(new GrpcStatus(GrpcStatusCode.DeadlineExceeded, "retry budget exceeded")); + + var opts = context.Options + .WithCancellationToken(ct) + .WithDeadline(DateTime.UtcNow + remaining); + + var ctx2 = new ClientInterceptorContext(context.Method, context.Host, opts); + var call = continuation(request, ctx2); + + statusProvider = call.GetStatus; + trailersProvider = call.GetTrailers; + disposeProvider = call.Dispose; + headersTask = call.ResponseHeadersAsync; + + return await call.ResponseAsync.ConfigureAwait(false); + } + + var responseTask = RetryExecutor.RunAsync( + op: Do, + policy: _policy, + isIdempotent: isRead, + operationKind: kind, + overallTimeout: null, + recreateSession: null, + resetTransport: ResetTransport, + classifier: _classifier, + maxAttempts: _maxAttempts + ); + + return new AsyncUnaryCall( + responseTask, + headersTask, + () => statusProvider(), + () => trailersProvider(), + () => disposeProvider() + ); + } + + private static bool GuessIdempotency(GrpcIMethod method) => + method.Type == MethodType.Unary && + (method.FullName.Contains("Read", StringComparison.OrdinalIgnoreCase) + || method.FullName.Contains("Describe", StringComparison.OrdinalIgnoreCase) + || method.FullName.Contains("Get", StringComparison.OrdinalIgnoreCase)); + + private static OperationKind Classify(GrpcIMethod method) + { + var n = method.FullName; + if (n.Contains("Topic", StringComparison.OrdinalIgnoreCase)) return OperationKind.Stream; + if (n.Contains("Scheme", StringComparison.OrdinalIgnoreCase)) return OperationKind.Schema; + if (n.Contains("Discovery", StringComparison.OrdinalIgnoreCase)) return OperationKind.Discovery; + return OperationKind.Read; + } +} diff --git a/src/Ydb.Sdk/src/Retry/YdbRetryPolicy.cs b/src/Ydb.Sdk/src/Retry/YdbRetryPolicy.cs new file mode 100644 index 00000000..23a3796e --- /dev/null +++ b/src/Ydb.Sdk/src/Retry/YdbRetryPolicy.cs @@ -0,0 +1,103 @@ +using GrpcStatusCode = Grpc.Core.StatusCode; + +namespace Ydb.Sdk.Retry; + +internal sealed class YdbRetryPolicy : IRetryPolicy +{ + public const int DefaultMaxAttempts = 10; + + private static readonly TimeSpan FastStart = TimeSpan.FromMilliseconds(10); + private static readonly TimeSpan FastCap = TimeSpan.FromSeconds(2); + private static readonly TimeSpan SlowStart = TimeSpan.FromSeconds(1); + private static readonly TimeSpan SlowCap = TimeSpan.FromSeconds(30); + + private const int HedgeStartAttempt = 3; + + private static readonly Random _rnd = Random.Shared; + + public RetryDecision Decide(in RetryContext ctx) + { + if (ctx.Operation == OperationKind.Stream) + return new RetryDecision(null); + + if (ctx.DeadlineLeft is { } left && left <= TimeSpan.Zero) + return new RetryDecision(null); + + var (tier, recreate, reset, hedge) = Classify(ctx); + if (tier == BackoffTier.None) + return new RetryDecision(null); + + var delay = tier switch + { + BackoffTier.Instant => TimeSpan.Zero, + BackoffTier.Fast => JitterDecorrelated(ctx.Attempt, FastStart, FastCap), + BackoffTier.Slow => JitterDecorrelated(ctx.Attempt, SlowStart, SlowCap), + _ => TimeSpan.Zero + }; + + if (ctx.DeadlineLeft is { } dl && delay > dl - TimeSpan.FromMilliseconds(50)) + return new RetryDecision(null); + + return new RetryDecision( + delay, + recreate, + reset, + hedge && ctx.IsIdempotent && ctx.Attempt >= HedgeStartAttempt); + } + + public void ReportResult(in RetryContext ctx, bool success) + { + System.Diagnostics.Debug.WriteLine( + $"[Retry] op={ctx.Operation} attempt={ctx.Attempt} success={success} " + + $"delayLeft={ctx.DeadlineLeft?.TotalMilliseconds:F0}ms " + + $"exception={ctx.LastFailure?.Exception.GetType().Name ?? "—"}"); + } + + private (BackoffTier tier, bool recreate, bool reset, bool hedge) Classify(in RetryContext ctx) + { + var ydb = ctx.LastFailure?.YdbStatusCode; + var grpc = ctx.LastFailure?.GrpcStatusCode; + + const int BAD_SESSION = 400100; + const int SESSION_EXPIRED = 400150; + const int SESSION_BUSY = 400190; + const int UNAVAILABLE = 400050; + const int ABORTED = 400040; + const int OVERLOADED = 400060; + const int TIMEOUT = 400090; + const int UNDETERMINED = 400170; + + return ydb switch + { + BAD_SESSION or SESSION_EXPIRED => (BackoffTier.Instant, recreate: true, reset: false, hedge: false), + SESSION_BUSY => (BackoffTier.Fast, recreate: true, reset: false, hedge: true), + UNAVAILABLE or ABORTED => (BackoffTier.Fast, recreate: false, reset: false, hedge: true), + OVERLOADED => (BackoffTier.Slow, recreate: false, reset: false, hedge: false), + TIMEOUT or UNDETERMINED => ctx.IsIdempotent + ? (BackoffTier.Fast, false, false, true) + : (BackoffTier.None, false, false, false), + _ => grpc switch + { + (int)GrpcStatusCode.Unavailable or (int)GrpcStatusCode.DeadlineExceeded => (BackoffTier.Fast, + recreate: false, reset: true, hedge: true), + (int)GrpcStatusCode.ResourceExhausted => (BackoffTier.Slow, recreate: false, reset: false, + hedge: false), + (int)GrpcStatusCode.Aborted => (BackoffTier.Instant, recreate: false, reset: false, hedge: true), + _ => (BackoffTier.None, false, false, false) + } + }; + } + + private static TimeSpan JitterDecorrelated(int attempt, TimeSpan start, TimeSpan cap) + { + if (attempt <= 1) return TimeSpan.Zero; + + var safeAttempt = Math.Min(attempt, 30); + var exp = Math.Pow(2, safeAttempt - 2); + var maxMs = Math.Min(cap.TotalMilliseconds, start.TotalMilliseconds * exp); + var minMs = start.TotalMilliseconds; + var jitterMs = _rnd.NextDouble() * (maxMs - minMs); + + return TimeSpan.FromMilliseconds(minMs + jitterMs); + } +}