Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 7 additions & 28 deletions slo/src/AdoNet/SloTableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using Internal;
using Microsoft.Extensions.Logging;
using Polly;
using Prometheus;
using Ydb.Sdk;
using Ydb.Sdk.Ado;

Expand All @@ -12,13 +11,7 @@ public class SloTableContext : SloTableContext<YdbDataSource>
{
private readonly AsyncPolicy _policy = Policy.Handle<YdbException>(exception => exception.IsTransient)
.WaitAndRetryAsync(10, attempt => TimeSpan.FromMilliseconds(attempt * 10),
(e, _, _, context) =>
{
var errorsTotal = (Counter)context["errorsTotal"];

Logger.LogWarning(e, "Failed read / write operation");
errorsTotal?.WithLabels(((YdbException)e).Code.StatusName(), "retried").Inc();
});
(e, _, _, _) => { Logger.LogWarning(e, "Failed read / write operation"); });

protected override string Job => "AdoNet";

Expand All @@ -33,7 +26,7 @@ protected override async Task Create(YdbDataSource client, int operationTimeout)
{
CommandText = $"""
CREATE TABLE `{SloTable.Name}` (
Guid UUID,
Guid Uuid,
Id Int32,
PayloadStr Text,
PayloadDouble Double,
Expand All @@ -49,24 +42,17 @@ PRIMARY KEY (Guid, Id)
protected override async Task<(int, StatusCode)> Save(
YdbDataSource client,
SloTable sloTable,
int writeTimeout,
Counter? errorsTotal = null
int writeTimeout
)
{
var context = new Context();
if (errorsTotal != null)
{
context["errorsTotal"] = errorsTotal;
}

var policyResult = await _policy.ExecuteAndCaptureAsync(async _ =>
{
await using var ydbConnection = await client.OpenConnectionAsync();

var ydbCommand = new YdbCommand(ydbConnection)
{
CommandText = $"""
INSERT INTO `{SloTable.Name}` (Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp)
UPSERT INTO `{SloTable.Name}` (Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp)
VALUES (@Guid, @Id, @PayloadStr, @PayloadDouble, @PayloadTimestamp)
""",
CommandTimeout = writeTimeout,
Expand Down Expand Up @@ -106,7 +92,7 @@ PRIMARY KEY (Guid, Id)
};

await ydbCommand.ExecuteNonQueryAsync();
}, context);
}, new Context());


return (policyResult.Context.TryGetValue("RetryCount", out var countAttempts) ? (int)countAttempts : 1,
Expand All @@ -116,16 +102,9 @@ PRIMARY KEY (Guid, Id)
protected override async Task<(int, StatusCode, object?)> Select(
YdbDataSource client,
(Guid Guid, int Id) select,
int readTimeout,
Counter? errorsTotal = null
int readTimeout
)
{
var context = new Context();
if (errorsTotal != null)
{
context["errorsTotal"] = errorsTotal;
}

var attempts = 0;
var policyResult = await _policy.ExecuteAndCaptureAsync(async _ =>
{
Expand All @@ -147,7 +126,7 @@ PRIMARY KEY (Guid, Id)
};

return await ydbCommand.ExecuteScalarAsync();
}, context);
}, new Context());

return (attempts, ((YdbException)policyResult.FinalException)?.Code ?? StatusCode.Success, policyResult.Result);
}
Expand Down
8 changes: 3 additions & 5 deletions slo/src/EF/SloTableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using Internal;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Prometheus;
using Ydb.Sdk;

namespace EF;
Expand All @@ -27,8 +26,8 @@ int operationTimeout
protected override async Task<(int, StatusCode)> Save(
PooledDbContextFactory<TableDbContext> client,
SloTable sloTable,
int writeTimeout,
Counter? errorsTotal = null)
int writeTimeout
)
{
await using var dbContext = await client.CreateDbContextAsync();
dbContext.SloEntities.Add(sloTable);
Expand All @@ -40,8 +39,7 @@ int operationTimeout
protected override async Task<(int, StatusCode, object?)> Select(
PooledDbContextFactory<TableDbContext> client,
(Guid Guid, int Id) select,
int readTimeout,
Counter? errorsTotal = null
int readTimeout
)
{
await using var dbContext = await client.CreateDbContextAsync();
Expand Down
18 changes: 8 additions & 10 deletions slo/src/Internal/SloTableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public async Task Run(RunConfig runConfig)
return;

Task ShootingTask(RateLimiter rateLimitPolicy, string operationType,
Func<T, RunConfig, Counter?, Task<(int, StatusCode)>> action)
Func<T, RunConfig, Task<(int, StatusCode)>> action)
{
var metricFactory = Metrics.WithLabels(new Dictionary<string, string>
{
Expand Down Expand Up @@ -219,7 +219,7 @@ Task ShootingTask(RateLimiter rateLimitPolicy, string operationType,
{
pendingOperations.Inc();
var sw = Stopwatch.StartNew();
var (attempts, statusCode) = await action(client, runConfig, errorsTotal);
var (attempts, statusCode) = await action(client, runConfig);
sw.Stop();

retryAttempts.Set(attempts);
Expand Down Expand Up @@ -251,15 +251,13 @@ Task ShootingTask(RateLimiter rateLimitPolicy, string operationType,
}

// return attempt count & StatusCode operation
protected abstract Task<(int, StatusCode)> Save(T client, SloTable sloTable, int writeTimeout,
Counter? errorsTotal = null);
protected abstract Task<(int, StatusCode)> Save(T client, SloTable sloTable, int writeTimeout);

protected abstract Task<(int, StatusCode, object?)> Select(T client, (Guid Guid, int Id) select, int readTimeout,
Counter? errorsTotal = null);
protected abstract Task<(int, StatusCode, object?)> Select(T client, (Guid Guid, int Id) select, int readTimeout);

protected abstract Task<int> SelectCount(T client);

private Task<(int, StatusCode)> Save(T client, Config config, Counter? errorsTotal = null)
private Task<(int, StatusCode)> Save(T client, Config config)
{
const int minSizeStr = 20;
const int maxSizeStr = 40;
Expand All @@ -276,14 +274,14 @@ Task ShootingTask(RateLimiter rateLimitPolicy, string operationType,
PayloadTimestamp = DateTime.Now
};

return Save(client, sloTable, config.WriteTimeout, errorsTotal);
return Save(client, sloTable, config.WriteTimeout);
}

private async Task<(int, StatusCode)> Select(T client, RunConfig config, Counter? errorsTotal = null)
private async Task<(int, StatusCode)> Select(T client, RunConfig config)
{
var id = Random.Shared.Next(_maxId);
var (attempts, code, _) =
await Select(client, new ValueTuple<Guid, int>(GuidFromInt(id), id), config.ReadTimeout, errorsTotal);
await Select(client, new ValueTuple<Guid, int>(GuidFromInt(id), id), config.ReadTimeout);

return (attempts, code);
}
Expand Down
Loading