diff --git a/.github/workflows/slo-topic.yaml b/.github/workflows/slo-topic.yaml index fde27b39..919bfd30 100644 --- a/.github/workflows/slo-topic.yaml +++ b/.github/workflows/slo-topic.yaml @@ -26,11 +26,11 @@ jobs: - name: Prepare SLO Database run: | cd slo/src/TopicService - dotnet run create grpc://localhost:2135 /Root/testdb + dotnet run create "Host=localhost;Port=2135;Database=/Root/testdb" - name: Run SLO Tests continue-on-error: true run: | cd slo/src/TopicService - dotnet run run grpc://localhost:2135 /Root/testdb \ + dotnet run run "Host=localhost;Port=2135;Database=/Root/testdb" \ --write-rps 50 \ --time 600 \ No newline at end of file diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index 34cd9511..88e1f578 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -31,6 +31,7 @@ jobs: matrix: workload: - AdoNet + - EF concurrency: group: slo-${{ github.ref }}-${{ matrix.workload }} @@ -56,12 +57,12 @@ jobs: - name: Prepare SLO Database run: | cd slo/src/${{ matrix.workload }} - dotnet run create grpc://localhost:2135 /Root/testdb + dotnet run create "Host=localhost;Port=2135;Database=/Root/testdb" - name: Run SLO Tests run: | cd slo/src/${{ matrix.workload }} - dotnet run run grpc://localhost:2135 /Root/testdb \ + dotnet run run "Host=localhost;Port=2135;Database=/Root/testdb" \ --prom-pgw http://localhost:9091 \ --report-period 250 \ --time ${{inputs.slo_workload_duration_seconds || 600 }} \ diff --git a/slo/playground/configs/compose.yaml b/slo/playground/configs/compose.yaml index e3fe38e5..f7269f51 100644 --- a/slo/playground/configs/compose.yaml +++ b/slo/playground/configs/compose.yaml @@ -5,7 +5,7 @@ x-runtime: &runtime network_mode: host x-ydb-node: &ydb-node - image: cr.yandex/crptqonuodf51kdj7a7d/ydb:24.3.11.13 + image: cr.yandex/crptqonuodf51kdj7a7d/ydb:24.4.4.2 restart: always <<: *runtime volumes: diff --git a/slo/src/AdoNet/SloTableContext.cs b/slo/src/AdoNet/SloTableContext.cs index 25df6fd1..de0c572f 100644 --- a/slo/src/AdoNet/SloTableContext.cs +++ b/slo/src/AdoNet/SloTableContext.cs @@ -1,10 +1,10 @@ +using System.Data; using Internal; using Microsoft.Extensions.Logging; using Polly; using Prometheus; using Ydb.Sdk; using Ydb.Sdk.Ado; -using Ydb.Sdk.Value; namespace AdoNet; @@ -22,17 +22,36 @@ public class SloTableContext : SloTableContext protected override string Job => "AdoNet"; - protected override async Task Create(YdbDataSource client, string createTableSql, int operationTimeout) + protected override YdbDataSource CreateClient(Config config) => new( + new YdbConnectionStringBuilder(config.ConnectionString) { LoggerFactory = ISloContext.Factory } + ); + + protected override async Task Create(YdbDataSource client, int operationTimeout) { await using var ydbConnection = await client.OpenConnectionAsync(); - await new YdbCommand(ydbConnection) - { CommandText = createTableSql, CommandTimeout = operationTimeout } - .ExecuteNonQueryAsync(); + { + CommandText = $""" + CREATE TABLE `{SloTable.Name}` ( + Guid UUID, + Id Int32, + PayloadStr Text, + PayloadDouble Double, + PayloadTimestamp Timestamp, + PRIMARY KEY (Guid, Id) + ); + {SloTable.Options} + """, + CommandTimeout = operationTimeout + }.ExecuteNonQueryAsync(); } - protected override async Task<(int, StatusCode)> Upsert(YdbDataSource dataSource, string upsertSql, - Dictionary parameters, int writeTimeout, Counter? errorsTotal = null) + protected override async Task<(int, StatusCode)> Save( + YdbDataSource client, + SloTable sloTable, + int writeTimeout, + Counter? errorsTotal = null + ) { var context = new Context(); if (errorsTotal != null) @@ -42,15 +61,49 @@ protected override async Task Create(YdbDataSource client, string createTableSql var policyResult = await _policy.ExecuteAndCaptureAsync(async _ => { - await using var ydbConnection = await dataSource.OpenConnectionAsync(); + await using var ydbConnection = await client.OpenConnectionAsync(); var ydbCommand = new YdbCommand(ydbConnection) - { CommandText = upsertSql, CommandTimeout = writeTimeout }; - - foreach (var (key, value) in parameters) { - ydbCommand.Parameters.AddWithValue(key, value); - } + CommandText = $""" + INSERT INTO `{SloTable.Name}` (Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp) + VALUES (@Guid, @Id, @PayloadStr, @PayloadDouble, @PayloadTimestamp) + """, + CommandTimeout = writeTimeout, + Parameters = + { + new YdbParameter + { + DbType = DbType.Guid, + ParameterName = "Guid", + Value = sloTable.Guid + }, + new YdbParameter + { + DbType = DbType.Int32, + ParameterName = "Id", + Value = sloTable.Id + }, + new YdbParameter + { + DbType = DbType.String, + ParameterName = "PayloadStr", + Value = sloTable.PayloadStr + }, + new YdbParameter + { + DbType = DbType.Double, + ParameterName = "PayloadDouble", + Value = sloTable.PayloadDouble + }, + new YdbParameter + { + DbType = DbType.DateTime2, + ParameterName = "PayloadTimestamp", + Value = sloTable.PayloadTimestamp + } + } + }; await ydbCommand.ExecuteNonQueryAsync(); }, context); @@ -60,8 +113,12 @@ protected override async Task Create(YdbDataSource client, string createTableSql ((YdbException)policyResult.FinalException)?.Code ?? StatusCode.Success); } - protected override async Task<(int, StatusCode, object?)> Select(YdbDataSource dataSource, string selectSql, - Dictionary parameters, int readTimeout, Counter? errorsTotal = null) + protected override async Task<(int, StatusCode, object?)> Select( + YdbDataSource client, + (Guid Guid, int Id) select, + int readTimeout, + Counter? errorsTotal = null + ) { var context = new Context(); if (errorsTotal != null) @@ -73,15 +130,21 @@ protected override async Task Create(YdbDataSource client, string createTableSql var policyResult = await _policy.ExecuteAndCaptureAsync(async _ => { attempts++; - await using var ydbConnection = await dataSource.OpenConnectionAsync(); + await using var ydbConnection = await client.OpenConnectionAsync(); var ydbCommand = new YdbCommand(ydbConnection) - { CommandText = selectSql, CommandTimeout = readTimeout }; - - foreach (var (key, value) in parameters) { - ydbCommand.Parameters.AddWithValue(key, value); - } + CommandText = $""" + SELECT Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp + FROM `{SloTable.Name}` WHERE Guid = @Guid AND Id = @Id; + """, + CommandTimeout = readTimeout, + Parameters = + { + new YdbParameter { ParameterName = "Guid", DbType = DbType.Guid, Value = select.Guid }, + new YdbParameter { ParameterName = "Id", DbType = DbType.Int32, Value = select.Id } + } + }; return await ydbCommand.ExecuteScalarAsync(); }, context); @@ -89,23 +152,11 @@ protected override async Task Create(YdbDataSource client, string createTableSql return (attempts, ((YdbException)policyResult.FinalException)?.Code ?? StatusCode.Success, policyResult.Result); } - protected override Task CreateClient(Config config) + protected override async Task SelectCount(YdbDataSource client) { - var splitEndpoint = config.Endpoint.Split("://"); - var useTls = splitEndpoint[0] switch - { - "grpc" => false, - "grpcs" => true, - _ => throw new ArgumentException("Don't support schema: " + splitEndpoint[0]) - }; - - var host = splitEndpoint[1].Split(":")[0]; - var port = splitEndpoint[1].Split(":")[1]; + await using var ydbConnection = await client.OpenConnectionAsync(); - return Task.FromResult(new YdbDataSource(new YdbConnectionStringBuilder - { - UseTls = useTls, Host = host, Port = int.Parse(port), Database = config.Db, - LoggerFactory = ISloContext.Factory - })); + return (int)(await new YdbCommand(ydbConnection) { CommandText = $"SELECT MAX(Id) FROM {SloTable.Name}" } + .ExecuteScalarAsync())!; } } \ No newline at end of file diff --git a/slo/src/EF/EF.csproj b/slo/src/EF/EF.csproj new file mode 100644 index 00000000..bcebb18b --- /dev/null +++ b/slo/src/EF/EF.csproj @@ -0,0 +1,24 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + diff --git a/slo/src/TableService/Program.cs b/slo/src/EF/Program.cs similarity index 54% rename from slo/src/TableService/Program.cs rename to slo/src/EF/Program.cs index 9ae53f20..dc91adc7 100644 --- a/slo/src/TableService/Program.cs +++ b/slo/src/EF/Program.cs @@ -1,6 +1,6 @@ // See https://aka.ms/new-console-template for more information +using EF; using Internal; -using TableService; -return await Cli.Run(new SloTableContext(), args); \ No newline at end of file +await Cli.Run(new SloTableContext(), args); \ No newline at end of file diff --git a/slo/src/EF/SloTableContext.cs b/slo/src/EF/SloTableContext.cs new file mode 100644 index 00000000..40a3fdef --- /dev/null +++ b/slo/src/EF/SloTableContext.cs @@ -0,0 +1,59 @@ +using EntityFrameworkCore.Ydb.Extensions; +using Internal; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Prometheus; +using Ydb.Sdk; + +namespace EF; + +public class SloTableContext : SloTableContext> +{ + protected override string Job => "EF"; + + protected override PooledDbContextFactory CreateClient(Config config) => + new(new DbContextOptionsBuilder().UseYdb(config.ConnectionString).Options); + + protected override async Task Create( + PooledDbContextFactory client, + int operationTimeout + ) + { + await using var dbContext = await client.CreateDbContextAsync(); + await dbContext.Database.EnsureCreatedAsync(); + await dbContext.Database.ExecuteSqlRawAsync(SloTable.Options); + } + + protected override async Task<(int, StatusCode)> Save( + PooledDbContextFactory client, + SloTable sloTable, + int writeTimeout, + Counter? errorsTotal = null) + { + await using var dbContext = await client.CreateDbContextAsync(); + dbContext.SloEntities.Add(sloTable); + await dbContext.SaveChangesAsync(); + + return (1, StatusCode.Success); + } + + protected override async Task<(int, StatusCode, object?)> Select( + PooledDbContextFactory client, + (Guid Guid, int Id) select, + int readTimeout, + Counter? errorsTotal = null + ) + { + await using var dbContext = await client.CreateDbContextAsync(); + await dbContext.SloEntities.FindAsync(select.Guid, select.Id); + + return (0, StatusCode.Success, null); + } + + protected override async Task SelectCount(PooledDbContextFactory client) + { + await using var dbContext = await client.CreateDbContextAsync(); + + return await dbContext.SloEntities.CountAsync(); + } +} \ No newline at end of file diff --git a/slo/src/EF/TableDbContext.cs b/slo/src/EF/TableDbContext.cs new file mode 100644 index 00000000..6280fccf --- /dev/null +++ b/slo/src/EF/TableDbContext.cs @@ -0,0 +1,14 @@ +using Internal; +using Microsoft.EntityFrameworkCore; + +namespace EF; + +public class TableDbContext(DbContextOptions options) : DbContext(options) +{ + public DbSet SloEntities { get; set; } + + protected override void OnModelCreating(ModelBuilder modelBuilder) => + modelBuilder.Entity() + .ToTable(SloTable.Name) + .HasKey(e => new { e.Guid, e.Id }); +} \ No newline at end of file diff --git a/slo/src/Internal/Cli.cs b/slo/src/Internal/Cli.cs index 40a3ea31..c061abd3 100644 --- a/slo/src/Internal/Cli.cs +++ b/slo/src/Internal/Cli.cs @@ -4,23 +4,14 @@ namespace Internal; public static class Cli { - private static readonly Argument EndpointArgument = new( - "endpoint", - "YDB endpoint to connect to"); - - private static readonly Argument DbArgument = new( - "db", - "YDB database to connect to"); + private static readonly Argument ConnectionStringArgument = new( + "connectionString", + "YDB connection string ADO NET format"); private static readonly Option PromPgwOption = new( "--prom-pgw", "prometheus push gateway"); - private static readonly Option ResourceYdbPath = new( - new[] { "-t", "--resource-ydb-path" }, - () => "test-resource", - "resource ydb path to create\n "); - private static readonly Option WriteTimeoutOption = new( "--write-timeout", () => 100, @@ -51,16 +42,6 @@ public static class Cli () => 600, "run time in seconds"); - private static readonly Option MinPartitionsCountOption = new( - "--min-partitions-count", - () => 5, - "minimum amount of partitions in table"); - - private static readonly Option MaxPartitionsCountOption = new( - "--max-partitions-count", - () => 10, - "maximum amount of partitions in table"); - private static readonly Option InitialDataCountOption = new( new[] { "-c", "--initial-data-count" }, () => 1000, @@ -70,11 +51,7 @@ public static class Cli "create", "creates table in database") { - EndpointArgument, - DbArgument, - ResourceYdbPath, - MinPartitionsCountOption, - MaxPartitionsCountOption, + ConnectionStringArgument, InitialDataCountOption, WriteTimeoutOption }; @@ -83,9 +60,7 @@ public static class Cli "run", "runs workload (read and write to table with sets RPS)") { - EndpointArgument, - DbArgument, - ResourceYdbPath, + ConnectionStringArgument, InitialDataCountOption, PromPgwOption, ReportPeriodOption, @@ -103,13 +78,28 @@ public static class Cli public static async Task Run(ISloContext sloContext, string[] args) { - CreateCommand.SetHandler(async createConfig => { await sloContext.Create(createConfig); }, - new CreateConfigBinder(EndpointArgument, DbArgument, ResourceYdbPath, MinPartitionsCountOption, - MaxPartitionsCountOption, InitialDataCountOption, WriteTimeoutOption)); - - RunCommand.SetHandler(async runConfig => { await sloContext.Run(runConfig); }, - new RunConfigBinder(EndpointArgument, DbArgument, ResourceYdbPath, PromPgwOption, ReportPeriodOption, - ReadRpsOption, ReadTimeoutOption, WriteRpsOption, WriteTimeoutOption, TimeOption)); + CreateCommand.SetHandler( + async createConfig => { await sloContext.Create(createConfig); }, + new CreateConfigBinder( + ConnectionStringArgument, + InitialDataCountOption, + WriteTimeoutOption + ) + ); + + RunCommand.SetHandler( + async runConfig => { await sloContext.Run(runConfig); }, + new RunConfigBinder( + ConnectionStringArgument, + PromPgwOption, + ReportPeriodOption, + ReadRpsOption, + ReadTimeoutOption, + WriteRpsOption, + WriteTimeoutOption, + TimeOption + ) + ); return await RootCommand.InvokeAsync(args); } diff --git a/slo/src/Internal/ConfigBinders.cs b/slo/src/Internal/ConfigBinders.cs index d4137865..4119408d 100644 --- a/slo/src/Internal/ConfigBinders.cs +++ b/slo/src/Internal/ConfigBinders.cs @@ -4,45 +4,33 @@ namespace Internal; public class CreateConfigBinder( - Argument endpointArgument, - Argument dbArgument, - Option resourceYdbPath, - Option minPartitionsCountOption, - Option maxPartitionsCountOption, + Argument connectionString, Option initialDataCountOption, - Option writeTimeoutOption) - : BinderBase + Option writeTimeoutOption +) : BinderBase { protected override CreateConfig GetBoundValue(BindingContext bindingContext) => new( - bindingContext.ParseResult.GetValueForArgument(endpointArgument), - bindingContext.ParseResult.GetValueForArgument(dbArgument), - bindingContext.ParseResult.GetValueForOption(resourceYdbPath)!, - bindingContext.ParseResult.GetValueForOption(minPartitionsCountOption), - bindingContext.ParseResult.GetValueForOption(maxPartitionsCountOption), + bindingContext.ParseResult.GetValueForArgument(connectionString), bindingContext.ParseResult.GetValueForOption(initialDataCountOption), bindingContext.ParseResult.GetValueForOption(writeTimeoutOption) ); } internal class RunConfigBinder( - Argument endpointArgument, - Argument dbArgument, - Option resourceYdbPath, + Argument connectionString, Option promPgwOption, Option reportPeriodOption, Option readRpsOption, Option readTimeoutOption, Option writeRpsOption, Option writeTimeoutOption, - Option timeOption) - : BinderBase + Option timeOption +) : BinderBase { protected override RunConfig GetBoundValue(BindingContext bindingContext) => new( - bindingContext.ParseResult.GetValueForArgument(endpointArgument), - bindingContext.ParseResult.GetValueForArgument(dbArgument), - bindingContext.ParseResult.GetValueForOption(resourceYdbPath)!, + bindingContext.ParseResult.GetValueForArgument(connectionString), bindingContext.ParseResult.GetValueForOption(promPgwOption)!, bindingContext.ParseResult.GetValueForOption(reportPeriodOption), bindingContext.ParseResult.GetValueForOption(readRpsOption), diff --git a/slo/src/Internal/Configs.cs b/slo/src/Internal/Configs.cs index 025aefe5..59ac80b2 100644 --- a/slo/src/Internal/Configs.cs +++ b/slo/src/Internal/Configs.cs @@ -1,24 +1,18 @@ namespace Internal; public record CreateConfig( - string Endpoint, - string Db, - string ResourcePathYdb, - int MinPartitionsCount, - int MaxPartitionsCount, + string ConnectionString, int InitialDataCount, - int WriteTimeout) : Config(Endpoint, Db, ResourcePathYdb, WriteTimeout); + int WriteTimeout) : Config(ConnectionString, WriteTimeout); public record RunConfig( - string Endpoint, - string Db, - string ResourcePathYdb, + string ConnectionString, string PromPgw, int ReportPeriod, int ReadRps, int ReadTimeout, int WriteRps, int WriteTimeout, - int Time) : Config(Endpoint, Db, ResourcePathYdb, WriteTimeout); + int Time) : Config(ConnectionString, WriteTimeout); -public record Config(string Endpoint, string Db, string ResourcePathYdb, int WriteTimeout); \ No newline at end of file +public record Config(string ConnectionString, int WriteTimeout); \ No newline at end of file diff --git a/slo/src/Internal/SloTable.cs b/slo/src/Internal/SloTable.cs new file mode 100644 index 00000000..bb66540f --- /dev/null +++ b/slo/src/Internal/SloTable.cs @@ -0,0 +1,23 @@ +using System.ComponentModel.DataAnnotations.Schema; + +namespace Internal; + +[Table("slo_test_table")] +public class SloTable +{ + public const string Name = "slo_test_table"; + + public const string Options = $""" + ALTER TABLE {Name} SET (AUTO_PARTITIONING_BY_SIZE = ENABLED); + ALTER TABLE {Name} SET (AUTO_PARTITIONING_BY_LOAD = ENABLED); + ALTER TABLE {Name} SET (AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 5); + ALTER TABLE {Name} SET (AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 10); + """; + + public Guid Guid { get; set; } + public int Id { get; set; } + + public required string PayloadStr { get; set; } + public double PayloadDouble { get; set; } + public DateTime PayloadTimestamp { get; set; } +} \ No newline at end of file diff --git a/slo/src/Internal/SloTableContext.cs b/slo/src/Internal/SloTableContext.cs index 3842841d..08bf5273 100644 --- a/slo/src/Internal/SloTableContext.cs +++ b/slo/src/Internal/SloTableContext.cs @@ -1,9 +1,9 @@ using System.Diagnostics; +using System.Security.Cryptography; using System.Threading.RateLimiting; using Microsoft.Extensions.Logging; using Prometheus; using Ydb.Sdk; -using Ydb.Sdk.Value; namespace Internal; @@ -19,12 +19,12 @@ public interface ISloContext }); - public Task Create(CreateConfig config); + public Task Create(CreateConfig createConfig); public Task Run(RunConfig runConfig); } -public abstract class SloTableContext : ISloContext where T : IDisposable +public abstract class SloTableContext : ISloContext { protected static readonly ILogger Logger = ISloContext.Factory.CreateLogger>(); @@ -32,37 +32,21 @@ public abstract class SloTableContext : ISloContext where T : IDisposable protected abstract string Job { get; } - public async Task Create(CreateConfig config) + protected abstract T CreateClient(Config config); + + public async Task Create(CreateConfig createConfig) { const int maxCreateAttempts = 10; + var client = CreateClient(createConfig); - using var client = await CreateClient(config); for (var attempt = 0; attempt < maxCreateAttempts; attempt++) { - Logger.LogInformation("Creating table {ResourcePathYdb}..", config.ResourcePathYdb); + Logger.LogInformation("Creating table {Name}...", SloTable.Name); try { - var createTableSql = $""" - CREATE TABLE `{config.ResourcePathYdb}` ( - hash Uint64, - id Int32, - payload_str Text, - payload_double Double, - payload_timestamp Timestamp, - payload_hash Uint64, - PRIMARY KEY (hash, id) - ) WITH ( - AUTO_PARTITIONING_BY_SIZE = ENABLED, - AUTO_PARTITIONING_BY_LOAD = ENABLED, - AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = {config.MinPartitionsCount}, - AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = {config.MaxPartitionsCount} - ); - """; - Logger.LogInformation("YQL script: {sql}", createTableSql); - - await Create(client, createTableSql, config.WriteTimeout); - - Logger.LogInformation("Created table {ResourcePathYdb}", config.ResourcePathYdb); + await Create(client, createConfig.WriteTimeout); + + Logger.LogInformation("Created table {Name}", SloTable.Name); break; } @@ -79,10 +63,10 @@ PRIMARY KEY (hash, id) } } - var tasks = new Task[config.InitialDataCount]; - for (var i = 0; i < config.InitialDataCount; i++) + var tasks = new Task[createConfig.InitialDataCount]; + for (var i = 0; i < createConfig.InitialDataCount; i++) { - tasks[i] = Upsert(client, config); + tasks[i] = Save(client, createConfig); } try @@ -99,21 +83,20 @@ PRIMARY KEY (hash, id) } } - protected abstract Task Create(T client, string createTableSql, int operationTimeout); + protected abstract Task Create(T client, int operationTimeout); public async Task Run(RunConfig runConfig) { // Trace.Listeners.Add(new ConsoleTraceListener()); debug meterPusher var promPgwEndpoint = $"{runConfig.PromPgw}/metrics"; - var client = await CreateClient(runConfig); using var prometheus = new MetricPusher(promPgwEndpoint, "workload-" + Job, intervalMilliseconds: runConfig.ReportPeriod); prometheus.Start(); - var (_, _, maxId) = await Select(client, $"SELECT MAX(id) as max_id FROM `{runConfig.ResourcePathYdb}`;", - new Dictionary(), runConfig.ReadTimeout); - _maxId = (int)maxId!; + var client = CreateClient(runConfig); + + _maxId = await SelectCount(client) + 1; Logger.LogInformation("Init row count: {MaxId}", _maxId); @@ -129,10 +112,10 @@ public async Task Run(RunConfig runConfig) var cancellationTokenSource = new CancellationTokenSource(); cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(runConfig.Time)); - var writeTask = ShootingTask(writeLimiter, "write", Upsert); + var writeTask = ShootingTask(writeLimiter, "write", Save); var readTask = ShootingTask(readLimiter, "read", Select); - Logger.LogInformation("Started write / read shooting.."); + Logger.LogInformation("Started write / read shooting..."); try { @@ -232,25 +215,32 @@ Task ShootingTask(RateLimiter rateLimitPolicy, string operationType, _ = Task.Run(async () => { - pendingOperations.Inc(); - var sw = Stopwatch.StartNew(); - var (attempts, statusCode) = await action(client, runConfig, errorsTotal); - sw.Stop(); - - retryAttempts.Set(attempts); - operationsTotal.Inc(); - pendingOperations.Dec(); - - if (statusCode != StatusCode.Success) + try { - errorsTotal.WithLabels(statusCode.StatusName()).Inc(); - operationsFailureTotal.Inc(); - operationLatencySeconds.WithLabels("err").Observe(sw.Elapsed.TotalSeconds); + pendingOperations.Inc(); + var sw = Stopwatch.StartNew(); + var (attempts, statusCode) = await action(client, runConfig, errorsTotal); + sw.Stop(); + + retryAttempts.Set(attempts); + operationsTotal.Inc(); + pendingOperations.Dec(); + + if (statusCode != StatusCode.Success) + { + errorsTotal.WithLabels(statusCode.StatusName()).Inc(); + operationsFailureTotal.Inc(); + operationLatencySeconds.WithLabels("err").Observe(sw.Elapsed.TotalSeconds); + } + else + { + operationsSuccessTotal.Inc(); + operationLatencySeconds.WithLabels("success").Observe(sw.Elapsed.TotalSeconds); + } } - else + catch (Exception e) { - operationsSuccessTotal.Inc(); - operationLatencySeconds.WithLabels("success").Observe(sw.Elapsed.TotalSeconds); + Logger.LogError(e, "Fail operation!"); } }, cancellationTokenSource.Token); } @@ -261,56 +251,51 @@ Task ShootingTask(RateLimiter rateLimitPolicy, string operationType, } // return attempt count & StatusCode operation - protected abstract Task<(int, StatusCode)> Upsert(T client, string upsertSql, - Dictionary parameters, - int writeTimeout, Counter? errorsTotal = null); + protected abstract Task<(int, StatusCode)> Save(T client, SloTable sloTable, int writeTimeout, + Counter? errorsTotal = null); + + protected abstract Task<(int, StatusCode, object?)> Select(T client, (Guid Guid, int Id) select, int readTimeout, + Counter? errorsTotal = null); - protected abstract Task<(int, StatusCode, object?)> Select(T client, string selectSql, - Dictionary parameters, int readTimeout, Counter? errorsTotal = null); + protected abstract Task SelectCount(T client); - private Task<(int, StatusCode)> Upsert(T client, Config config, Counter? errorsTotal = null) + private Task<(int, StatusCode)> Save(T client, Config config, Counter? errorsTotal = null) { const int minSizeStr = 20; const int maxSizeStr = 40; - return Upsert(client, - $""" - DECLARE $id AS Int32; - DECLARE $payload_str AS Utf8; - DECLARE $payload_double AS Double; - DECLARE $payload_timestamp AS Timestamp; - UPSERT INTO `{config.ResourcePathYdb}` (id, hash, payload_str, payload_double, payload_timestamp) - VALUES ($id, Digest::NumericHash($id), $payload_str, $payload_double, $payload_timestamp) - """, new Dictionary - { - { "$id", YdbValue.MakeInt32(Interlocked.Increment(ref _maxId)) }, - { - "$payload_str", YdbValue.MakeUtf8(string.Join(string.Empty, Enumerable - .Repeat(0, Random.Shared.Next(minSizeStr, maxSizeStr)) - .Select(_ => (char)Random.Shared.Next(127)))) - }, - { "$payload_double", YdbValue.MakeDouble(Random.Shared.NextDouble()) }, - { "$payload_timestamp", YdbValue.MakeTimestamp(DateTime.Now) } - }, config.WriteTimeout, errorsTotal); + var id = Interlocked.Increment(ref _maxId); + var sloTable = new SloTable + { + Guid = GuidFromInt(id), + Id = id, + PayloadStr = string.Join(string.Empty, Enumerable + .Repeat(0, Random.Shared.Next(minSizeStr, maxSizeStr)) + .Select(_ => (char)Random.Shared.Next(127))), + PayloadDouble = Random.Shared.NextDouble(), + PayloadTimestamp = DateTime.Now + }; + + return Save(client, sloTable, config.WriteTimeout, errorsTotal); } - protected abstract Task CreateClient(Config config); - private async Task<(int, StatusCode)> Select(T client, RunConfig config, Counter? errorsTotal = null) { - var (attempts, code, _) = await Select(client, - $""" - DECLARE $id AS Int32; - SELECT id, payload_str, payload_double, payload_timestamp, payload_hash - FROM `{config.ResourcePathYdb}` WHERE id = $id AND hash = Digest::NumericHash($id) - """, - new Dictionary - { - { "$id", YdbValue.MakeInt32(Random.Shared.Next(_maxId)) } - }, config.ReadTimeout, errorsTotal); + var id = Random.Shared.Next(_maxId); + var (attempts, code, _) = + await Select(client, new ValueTuple(GuidFromInt(id), id), config.ReadTimeout, errorsTotal); return (attempts, code); } + + private static Guid GuidFromInt(int value) + { + var intBytes = BitConverter.GetBytes(value); + var hash = SHA1.HashData(intBytes); + var guidBytes = new byte[16]; + Array.Copy(hash, guidBytes, 16); + return new Guid(guidBytes); + } } public static class StatusCodeExtension diff --git a/slo/src/TableService/SloTableContext.cs b/slo/src/TableService/SloTableContext.cs deleted file mode 100644 index 947362a8..00000000 --- a/slo/src/TableService/SloTableContext.cs +++ /dev/null @@ -1,80 +0,0 @@ -using Internal; -using Microsoft.Extensions.Logging; -using Prometheus; -using Ydb.Sdk; -using Ydb.Sdk.Services.Table; -using Ydb.Sdk.Value; - -namespace TableService; - -public class SloTableContext : SloTableContext -{ - private readonly TxControl _txControl = TxControl.BeginSerializableRW().Commit(); - protected override string Job => "TableService"; - - protected override async Task Create(TableClient client, string createTableSql, int operationTimeout) - { - var response = await client.SessionExec(async session => await session.ExecuteSchemeQuery(createTableSql, - new ExecuteSchemeQuerySettings { OperationTimeout = TimeSpan.FromSeconds(operationTimeout) })); - - response.Status.EnsureSuccess(); - } - - protected override async Task<(int, StatusCode)> Upsert(TableClient tableClient, string upsertSql, - Dictionary parameters, int writeTimeout, Counter? errorsGauge = null) - { - var querySettings = new ExecuteDataQuerySettings - { OperationTimeout = TimeSpan.FromSeconds(writeTimeout) }; - - var attempts = 0; - - var response = await tableClient.SessionExec(async session => - { - attempts++; - var response = await session.ExecuteDataQuery(upsertSql, _txControl, parameters, querySettings); - if (response.Status.IsSuccess) - { - return response; - } - - - errorsGauge?.WithLabels(response.Status.StatusCode.ToString(), "retried").Inc(); - - return response; - }); - - return (attempts, response.Status.StatusCode); - } - - protected override async Task<(int, StatusCode, object?)> Select(TableClient tableClient, string selectSql, - Dictionary parameters, int readTimeout, Counter? errorsGauge = null) - { - var querySettings = new ExecuteDataQuerySettings - { OperationTimeout = TimeSpan.FromSeconds(readTimeout) }; - - var attempts = 0; - - var response = (ExecuteDataQueryResponse)await tableClient.SessionExec(async session => - { - attempts++; - var response = await session.ExecuteDataQuery(selectSql, _txControl, parameters, querySettings); - if (response.Status.IsSuccess) - { - return response; - } - - Logger.LogWarning("{}", response.Status.ToString()); - - errorsGauge?.WithLabels(response.Status.StatusCode.StatusName(), "retried").Inc(); - - return response; - }); - - return (attempts, response.Status.StatusCode, - response.Status.IsSuccess ? response.Result.ResultSets[0].Rows[0][0].GetOptionalInt32() : null); - } - - protected override async Task CreateClient(Config config) => new( - await Driver.CreateInitialized(new DriverConfig(config.Endpoint, config.Db), ISloContext.Factory) - ); -} \ No newline at end of file diff --git a/slo/src/TableService/TableService.csproj b/slo/src/TableService/TableService.csproj deleted file mode 100644 index 99a59412..00000000 --- a/slo/src/TableService/TableService.csproj +++ /dev/null @@ -1,14 +0,0 @@ - - - - Exe - net8.0 - enable - enable - slo - - - - - - diff --git a/slo/src/TopicService/SloTopicContext.cs b/slo/src/TopicService/SloTopicContext.cs index c5cfee3f..1d582dbd 100644 --- a/slo/src/TopicService/SloTopicContext.cs +++ b/slo/src/TopicService/SloTopicContext.cs @@ -3,6 +3,7 @@ using Internal; using Microsoft.Extensions.Logging; using Ydb.Sdk; +using Ydb.Sdk.Ado; using Ydb.Sdk.Services.Topic; using Ydb.Sdk.Services.Topic.Reader; using Ydb.Sdk.Services.Topic.Writer; @@ -17,10 +18,15 @@ public class SloTopicContext : ISloContext private static readonly ILogger Logger = ISloContext.Factory.CreateLogger(); - public async Task Create(CreateConfig config) + public async Task Create(CreateConfig createConfig) { + var connectionStringBuilder = new YdbConnectionStringBuilder(createConfig.ConnectionString); + var topicClient = new TopicClient(await Driver.CreateInitialized( - new DriverConfig(config.Endpoint, config.Db), ISloContext.Factory) + new DriverConfig( + $"grpc://{connectionStringBuilder.Host}:{connectionStringBuilder.Port}", + connectionStringBuilder.Database + ), ISloContext.Factory) ); await topicClient.CreateTopic( @@ -46,9 +52,15 @@ await topicClient.CreateTopic( public async Task Run(RunConfig config) { + var connectionStringBuilder = new YdbConnectionStringBuilder(config.ConnectionString); + Logger.LogInformation("Started Run topic slo test"); var driver = await Driver.CreateInitialized( - new DriverConfig(config.Endpoint, config.Db), ISloContext.Factory); + new DriverConfig( + $"grpc://{connectionStringBuilder.Host}:{connectionStringBuilder.Port}", + connectionStringBuilder.Database + ), ISloContext.Factory + ); Logger.LogInformation("Driver is initialized!"); diff --git a/slo/src/src.sln b/slo/src/src.sln index a633ec1b..749a4a08 100644 --- a/slo/src/src.sln +++ b/slo/src/src.sln @@ -3,24 +3,20 @@ Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 17 VisualStudioVersion = 17.5.002.0 MinimumVisualStudioVersion = 10.0.40219.1 -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TableService", "TableService\TableService.csproj", "{F27CEA2C-32D7-4673-8F7A-1F15043B21EB}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Internal", "Internal\Internal.csproj", "{E0ECAA3A-3234-459C-B076-6AC53B63ADC7}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AdoNet", "AdoNet\AdoNet.csproj", "{B6FFBFBE-F568-4186-B160-CC35A1BBD050}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TopicService", "TopicService\TopicService.csproj", "{431B6493-440C-4550-86EB-E9446683880E}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EF", "EF\EF.csproj", "{291A2C64-8A20-40AA-B68A-B9E2A730B66C}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU Release|Any CPU = Release|Any CPU EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution - {F27CEA2C-32D7-4673-8F7A-1F15043B21EB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {F27CEA2C-32D7-4673-8F7A-1F15043B21EB}.Debug|Any CPU.Build.0 = Debug|Any CPU - {F27CEA2C-32D7-4673-8F7A-1F15043B21EB}.Release|Any CPU.ActiveCfg = Release|Any CPU - {F27CEA2C-32D7-4673-8F7A-1F15043B21EB}.Release|Any CPU.Build.0 = Release|Any CPU {E0ECAA3A-3234-459C-B076-6AC53B63ADC7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {E0ECAA3A-3234-459C-B076-6AC53B63ADC7}.Debug|Any CPU.Build.0 = Debug|Any CPU {E0ECAA3A-3234-459C-B076-6AC53B63ADC7}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -33,6 +29,10 @@ Global {431B6493-440C-4550-86EB-E9446683880E}.Debug|Any CPU.Build.0 = Debug|Any CPU {431B6493-440C-4550-86EB-E9446683880E}.Release|Any CPU.ActiveCfg = Release|Any CPU {431B6493-440C-4550-86EB-E9446683880E}.Release|Any CPU.Build.0 = Release|Any CPU + {291A2C64-8A20-40AA-B68A-B9E2A730B66C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {291A2C64-8A20-40AA-B68A-B9E2A730B66C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {291A2C64-8A20-40AA-B68A-B9E2A730B66C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {291A2C64-8A20-40AA-B68A-B9E2A730B66C}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE