Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/Ydb.Sdk/src/Ado/YdbCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,9 @@ await YdbConnection.Session
public new async Task<YdbDataReader> ExecuteReaderAsync(CommandBehavior behavior,
CancellationToken cancellationToken) =>
(YdbDataReader)await ExecuteDbDataReaderAsync(behavior, cancellationToken);

internal Task<DbDataReader> InternalExecuteDbDataReaderAsync(
CommandBehavior behavior,
CancellationToken cancellationToken) =>
ExecuteDbDataReaderAsync(behavior, cancellationToken);
}
2 changes: 2 additions & 0 deletions src/Ydb.Sdk/src/Ado/YdbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ protected override void Dispose(bool disposing)
return;
if (disposing)
Close();
GC.SuppressFinalize(this);
_disposed = true;
}

Expand All @@ -250,6 +251,7 @@ public override async ValueTask DisposeAsync()
return;

await CloseAsync();
GC.SuppressFinalize(this);
_disposed = true;
}

Expand Down
166 changes: 156 additions & 10 deletions src/Ydb.Sdk/src/Ado/YdbDataSource.cs
Original file line number Diff line number Diff line change
@@ -1,27 +1,37 @@
#if NET7_0_OR_GREATER
using System.Data.Common;
using Ydb.Sdk.Retry;
using Ydb.Sdk.Retry.Classifier;

namespace Ydb.Sdk.Ado;

public class YdbDataSource : DbDataSource
{
private readonly YdbConnectionStringBuilder _ydbConnectionStringBuilder;

public YdbDataSource(YdbConnectionStringBuilder connectionStringBuilder)
{
_ydbConnectionStringBuilder = connectionStringBuilder;
}
private static readonly YdbRetryPolicy DefaultRetryPolicy = new();
private TimeSpan DefaultTimeout { get; init; } = TimeSpan.FromSeconds(30);
private int MaxRetryAttempts { get; init; } = YdbRetryPolicy.DefaultMaxAttempts;

private YdbDataSource(YdbConnectionStringBuilder connectionStringBuilder)
: this() => _ydbConnectionStringBuilder = connectionStringBuilder;

public YdbDataSource(string connectionString)
{
_ydbConnectionStringBuilder = new YdbConnectionStringBuilder(connectionString);
}
: this(new YdbConnectionStringBuilder(connectionString)) { }

public YdbDataSource()
{
_ydbConnectionStringBuilder = new YdbConnectionStringBuilder();
}

public YdbDataSource(YdbConnectionStringBuilder csb, TimeSpan defaultTimeout,
int maxAttempts = YdbRetryPolicy.DefaultMaxAttempts)
: this(csb)
{
DefaultTimeout = defaultTimeout;
MaxRetryAttempts = maxAttempts;
}

protected override YdbConnection CreateDbConnection() => new(_ydbConnectionStringBuilder);

protected override YdbConnection OpenDbConnection()
Expand All @@ -34,7 +44,14 @@ protected override YdbConnection OpenDbConnection()
}
catch
{
dbConnection.Close();
try
{
dbConnection.Close();
}
catch
{
// ignored
}
throw;
}
}
Expand All @@ -54,7 +71,14 @@ protected override YdbConnection OpenDbConnection()
}
catch
{
await ydbConnection.CloseAsync();
try
{
await ydbConnection.CloseAsync();
}
catch
{
// ignored
}
throw;
}
}
Expand All @@ -64,7 +88,129 @@ protected override YdbConnection OpenDbConnection()
protected override async ValueTask DisposeAsyncCore() =>
await PoolManager.ClearPool(_ydbConnectionStringBuilder.ConnectionString);

protected override void Dispose(bool disposing) => DisposeAsyncCore().AsTask().GetAwaiter().GetResult();
protected override void Dispose(bool disposing)
{
try { DisposeAsyncCore().AsTask().GetAwaiter().GetResult(); }
catch
{
// ignored
}
}

public async Task<TResult> ExecuteWithRetryAsync<TResult>(
Func<CancellationToken, Task<TResult>> operation,
object? context = null,
TimeSpan? timeout = null,
int? maxAttempts = null,
CancellationToken cancellationToken = default) =>
await ExecuteWithRetryAsync(
operation: operation,
context: context,
timeout: timeout,
classifier: null,
maxAttempts: maxAttempts ?? MaxRetryAttempts,
cancellationToken: cancellationToken);

private async Task<TResult> ExecuteWithRetryAsync<TResult>(
Func<CancellationToken, Task<TResult>> operation,
object? context = null,
TimeSpan? timeout = null,
IRetryClassifier? classifier = null,
int? maxAttempts = null,
CancellationToken cancellationToken = default)
{
var (kind, isIdempotent) = InferKindAndIdempotency(context);

return await RetryExecutor.RunAsync(
op: operation,
policy: DefaultRetryPolicy,
isIdempotent: isIdempotent,
operationKind: kind,
overallTimeout: timeout ?? DefaultTimeout,
classifier: classifier,
maxAttempts: maxAttempts ?? MaxRetryAttempts,
ct: cancellationToken
).ConfigureAwait(false);
}

private static (OperationKind kind, bool isIdempotent) InferKindAndIdempotency(object? context)
{
switch (context)
{
case DbCommand cmd:
{
var kw = FirstKeyword(cmd.CommandText);
return kw switch
{
"SELECT" or "SHOW" or "DESCRIBE" => (OperationKind.Read, true),
"INSERT" or "UPDATE" or "DELETE" or "UPSERT" => (OperationKind.Write, false),
"CREATE" or "DROP" or "ALTER" => (OperationKind.Schema, ContainsIfExists(cmd.CommandText)),
_ => (OperationKind.Read, false)
};
}
case string sqlText:
{
var kw = FirstKeyword(sqlText);
return kw switch
{
"SELECT" or "SHOW" or "DESCRIBE" => (OperationKind.Read, true),
"INSERT" or "UPDATE" or "DELETE" or "UPSERT" => (OperationKind.Write, false),
"CREATE" or "DROP" or "ALTER" => (OperationKind.Schema, ContainsIfExists(sqlText)),
_ => (OperationKind.Write, false)
};
}
}

return (OperationKind.Read, false);
}

private static bool ContainsIfExists(string? sql)
{
if (string.IsNullOrEmpty(sql)) return false;
return sql.IndexOf("IF NOT EXISTS", StringComparison.OrdinalIgnoreCase) >= 0
|| sql.IndexOf("IF EXISTS", StringComparison.OrdinalIgnoreCase) >= 0;
}

private static string FirstKeyword(string? sql)
{
if (string.IsNullOrWhiteSpace(sql)) return string.Empty;
var s = sql.AsSpan();

while (s.Length > 0 && char.IsWhiteSpace(s[0])) s = s[1..];

while (true)
{
if (s.StartsWith("--".AsSpan()))
{
var idx = s.IndexOfAny('\r', '\n');
s = idx >= 0 ? s[(idx + 1)..] : ReadOnlySpan<char>.Empty;
while (s.Length > 0 && char.IsWhiteSpace(s[0])) s = s[1..];
continue;
}
if (s.StartsWith("/*".AsSpan()))
{
var end = s.IndexOf("*/".AsSpan());
s = end >= 0 ? s[(end + 2)..] : ReadOnlySpan<char>.Empty;
while (s.Length > 0 && char.IsWhiteSpace(s[0])) s = s[1..];
continue;
}
break;
}

while (s.Length > 0 && s[0] == '(')
{
s = s[1..];
while (s.Length > 0 && char.IsWhiteSpace(s[0])) s = s[1..];
}

var i = 0;
while (i < s.Length && char.IsLetter(s[i])) i++;
if (i == 0) return string.Empty;

var kw = s[..i].ToString().ToUpperInvariant();
if (kw == "WITH") return "SELECT";
return kw;
}
}

#endif
155 changes: 155 additions & 0 deletions src/Ydb.Sdk/src/Retry/Classifier/DefaultRetryClassifier.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
using Grpc.Core;

namespace Ydb.Sdk.Retry.Classifier;

internal sealed class DefaultRetryClassifier : IRetryClassifier
{
public static readonly DefaultRetryClassifier Instance = new();

private static readonly string[] MemberNames = { "StatusCode", "Status", "YdbStatusCode", "Code" };

private sealed class Accessors
{
public Func<object, int?>? ReadDirect { get; init; }
public Func<object, object?>? ReadResponse { get; init; }
}

private static readonly ConcurrentDictionary<global::System.Type, Accessors> Cache = new();

[UnconditionalSuppressMessage("Trimming", "IL2057")]
[UnconditionalSuppressMessage("AOT", "IL3050")]
public Failure? Classify(Exception ex)
{
if (ex is RpcException rx)
return new Failure(rx, null, (int)rx.StatusCode);

var f = TryClassify(ex);
if (f is not null) return f;

if (ex is AggregateException aggr)
{
foreach (var inner in aggr.Flatten().InnerExceptions)
{
if (inner is RpcException irx)
return new Failure(ex, null, (int)irx.StatusCode);
f = TryClassify(inner);
if (f is not null) return f;
}
}

var cur = ex.InnerException;
while (cur is not null)
{
if (cur is RpcException rx2)
return new Failure(ex, null, (int)rx2.StatusCode);
f = TryClassify(cur);
if (f is not null) return f;
cur = cur.InnerException;
}

return new Failure(ex);
}

private static Accessors Build(global::System.Type t)
{
static int? ReadIntOrEnum(object? val, global::System.Type type)
{
if (val is null) return null;
if (val is int i) return i;
if (type.IsEnum) return Convert.ToInt32(val);
return null;
}

Func<object, object?> readResp = obj =>
{
try
{
return t.GetProperty("Result",
System.Reflection.BindingFlags.Instance |
System.Reflection.BindingFlags.Public |
System.Reflection.BindingFlags.NonPublic)?.GetValue(obj)
?? t.GetProperty("Response",
System.Reflection.BindingFlags.Instance |
System.Reflection.BindingFlags.Public |
System.Reflection.BindingFlags.NonPublic)?.GetValue(obj);
}
catch { return null; }
};

return new Accessors { ReadDirect = Direct, ReadResponse = readResp };

int? Direct(object obj)
{
try
{
foreach (var name in MemberNames)
{
var p = t.GetProperty(name, System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.NonPublic);
if (p is not null)
{
object? v = null;
try
{
v = p.GetValue(obj);
}
catch
{
/* ignore */
}

var r = ReadIntOrEnum(v, p.PropertyType);
if (r.HasValue) return r;
}

var f = t.GetField(name, System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.NonPublic);
if (f is not null)
{
object? v = null;
try
{
v = f.GetValue(obj);
}
catch
{
/* ignore */
}

var r = ReadIntOrEnum(v, f.FieldType);
if (r.HasValue) return r;
}
}
}
catch
{
/* ignore */
}

return null;
}
}

private static Failure? TryClassify(Exception ex)
{
if (ex is RpcException rx)
return new Failure(rx, null, (int)rx.StatusCode);

global::System.Type t = ex.GetType();
var acc = Cache.GetOrAdd(t, tt => Build(tt));

var code = acc.ReadDirect?.Invoke(ex);
if (code.HasValue) return new Failure(ex, code, null);

var resp = acc.ReadResponse?.Invoke(ex);
if (resp is not null)
{
global::System.Type rt = resp.GetType();
var racc = Cache.GetOrAdd(rt, Build);
var code2 = racc.ReadDirect?.Invoke(resp);
if (code2.HasValue) return new Failure(ex, code2, null);
}

return null;
}
}
6 changes: 6 additions & 0 deletions src/Ydb.Sdk/src/Retry/Classifier/IRetryClassifier.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Ydb.Sdk.Retry.Classifier;

internal interface IRetryClassifier
{
Failure? Classify(Exception ex);
}
Loading
Loading