Skip to content

Commit 9e7715f

Browse files
committed
create slo lonq2db
1 parent 523bbd8 commit 9e7715f

File tree

4 files changed

+351
-0
lines changed

4 files changed

+351
-0
lines changed

slo/src/Linq2db/Linq2db.csproj

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net8.0</TargetFramework>
6+
<ImplicitUsings>enable</ImplicitUsings>
7+
<Nullable>enable</Nullable>
8+
<RootNamespace>Linq2dbTest</RootNamespace>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<PackageReference Include="linq2db" Version="6.0.0-rc.3" />
13+
<ProjectReference Include="..\Internal\Internal.csproj" />
14+
<PackageReference Include="Grpc.Core" Version="2.46.6" />
15+
</ItemGroup>
16+
17+
</Project>

slo/src/Linq2db/Program.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
using Internal;
2+
using Linq2db;
3+
4+
await Cli.Run(new SloLinq2DbContext(), args);
Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
using System.Diagnostics;
2+
using System.Security.Cryptography;
3+
using System.Threading.RateLimiting;
4+
using Grpc.Core;
5+
using Internal;
6+
using LinqToDB;
7+
using LinqToDB.Async;
8+
using LinqToDB.Data;
9+
using LinqToDB.Mapping;
10+
using LinqToDB.DataProvider;
11+
using Microsoft.Extensions.Logging;
12+
using Prometheus;
13+
using Ydb.Sdk.Ado;
14+
15+
namespace Linq2db;
16+
17+
/// <summary>
18+
/// SLO harness implemented on top of LINQ to DB provider for YDB.
19+
/// Mirrors behavior of other SLO contexts (ADO.NET/EF/Topic) in this repo.
20+
/// </summary>
21+
public sealed class SloLinq2DbContext : ISloContext
22+
{
23+
private static readonly ILogger Logger = ISloContext.Factory.CreateLogger<SloLinq2DbContext>();
24+
25+
// Prometheus metrics (shared labels: operation, status)
26+
private static readonly Counter Requests = Metrics.CreateCounter(
27+
"ydb_slo_requests_total",
28+
"Total number of SLO operations processed.",
29+
new CounterConfiguration { LabelNames = ["operation", "status"] });
30+
31+
private static readonly Histogram Duration = Metrics.CreateHistogram(
32+
"ydb_slo_duration_seconds",
33+
"Duration of SLO operations.",
34+
new HistogramConfiguration {
35+
LabelNames = ["operation", "status"],
36+
Buckets = Histogram.ExponentialBuckets(start: 0.002, factor: 1.5, count: 20)
37+
});
38+
39+
public async Task Create(CreateConfig config)
40+
{
41+
Logger.LogInformation("Create: connection={ConnectionString}, initialCount={InitialCount}, writeTimeout={Timeout}s",
42+
config.ConnectionString, config.InitialDataCount, config.WriteTimeout);
43+
44+
using var ydb = new YdbConnection(config.ConnectionString);
45+
await ydb.OpenAsync();
46+
47+
var provider = ResolveYdbProvider();
48+
using var db = new DataConnection(provider, ydb);
49+
db.AddMappingSchema(CreateMapping());
50+
51+
await EnsureTableAsync(db);
52+
53+
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(config.WriteTimeout));
54+
var now = DateTime.UtcNow;
55+
56+
const int batchSize = 500;
57+
int total = config.InitialDataCount;
58+
int inserted = 0;
59+
60+
for (int i = 1; i <= total; i += batchSize)
61+
{
62+
var take = Math.Min(batchSize, total - i + 1);
63+
var batch = new List<SloTable>(capacity: take);
64+
for (int j = 0; j < take; j++)
65+
{
66+
var id = i + j;
67+
batch.Add(new SloTable
68+
{
69+
Guid = MakeGuidFromInt(id),
70+
Id = id,
71+
PayloadStr = $"seed-{id}",
72+
PayloadDouble = id * 1.0,
73+
PayloadTimestamp = now
74+
});
75+
}
76+
77+
try
78+
{
79+
await db.BulkCopyAsync(new BulkCopyOptions { KeepIdentity = true }, batch, cts.Token);
80+
inserted += batch.Count;
81+
}
82+
catch (NotSupportedException)
83+
{
84+
foreach (var e in batch)
85+
{
86+
await db.InsertAsync(e, token: cts.Token);
87+
inserted++;
88+
}
89+
}
90+
}
91+
92+
Logger.LogInformation("Create finished. Seeded: {Inserted} rows.", inserted);
93+
}
94+
95+
public async Task Run(RunConfig config)
96+
{
97+
Logger.LogInformation(
98+
"Run: conn={Conn}, pgw={Pgw}, period={Period}ms, readRps={ReadRps}, readTimeout={ReadTimeout}s, writeRps={WriteRps}, writeTimeout={WriteTimeout}s, time={Time}s",
99+
config.ConnectionString, config.PromPgw, config.ReportPeriod, config.ReadRps, config.ReadTimeout,
100+
config.WriteRps, config.WriteTimeout, config.Time);
101+
102+
using var pusher = new MetricPusher(new MetricPusherOptions
103+
{
104+
Endpoint = config.PromPgw,
105+
Job = "ydb_slo_linq2db",
106+
Instance = Environment.MachineName,
107+
ReplaceOnPush = true,
108+
IntervalMilliseconds = config.ReportPeriod
109+
});
110+
pusher.Start();
111+
112+
using var ydb = new YdbConnection(config.ConnectionString);
113+
await ydb.OpenAsync();
114+
115+
var provider = ResolveYdbProvider();
116+
using var db = new DataConnection(provider, ydb);
117+
db.AddMappingSchema(CreateMapping());
118+
119+
// Get current max Id
120+
var maxId = await db.GetTable<SloTable>().Select(t => (int?)t.Id).MaxAsync() ?? 0;
121+
var nextWriteId = maxId;
122+
123+
var readLimiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions
124+
{
125+
TokenLimit = Math.Max(1, config.ReadRps),
126+
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
127+
QueueLimit = 0,
128+
ReplenishmentPeriod = TimeSpan.FromSeconds(1),
129+
TokensPerPeriod = Math.Max(1, config.ReadRps),
130+
AutoReplenishment = true
131+
});
132+
133+
var writeLimiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions
134+
{
135+
TokenLimit = Math.Max(1, config.WriteRps),
136+
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
137+
QueueLimit = 0,
138+
ReplenishmentPeriod = TimeSpan.FromSeconds(1),
139+
TokensPerPeriod = Math.Max(1, config.WriteRps),
140+
AutoReplenishment = true
141+
});
142+
143+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(config.Time));
144+
145+
var readTask = Task.Run(() => LoopAsync("read", ReadOnceAsync), cts.Token);
146+
var writeTask = Task.Run(() => LoopAsync("write", WriteOnceAsync), cts.Token);
147+
148+
try
149+
{
150+
await Task.WhenAll(readTask, writeTask);
151+
}
152+
catch (Exception ex)
153+
{
154+
Logger.LogInformation(ex, "Run finished with cancellation or error.");
155+
}
156+
157+
pusher.Stop();
158+
Logger.LogInformation("Run task is finished.");
159+
160+
return;
161+
162+
async Task LoopAsync(string operation, Func<CancellationToken, Task> action)
163+
{
164+
var limiter = operation == "read" ? readLimiter : writeLimiter;
165+
var timeout = TimeSpan.FromSeconds(operation == "read" ? config.ReadTimeout : config.WriteTimeout);
166+
167+
while (!cts.IsCancellationRequested)
168+
{
169+
using var lease = await limiter.AcquireAsync(permitCount: 1, cancellationToken: cts.Token);
170+
if (!lease.IsAcquired) continue;
171+
172+
using var rpcCts = new CancellationTokenSource(timeout);
173+
var sw = Stopwatch.StartNew();
174+
string status = "OK";
175+
176+
try
177+
{
178+
await action(rpcCts.Token);
179+
}
180+
catch (RpcException rpcEx)
181+
{
182+
status = $"GRPC_{rpcEx.Status.StatusCode}";
183+
Logger.LogWarning(rpcEx, "GRPC error in {Operation}", operation);
184+
}
185+
catch (Exception ex) when (TryExtractStatusLabel(ex, out var statusLabel))
186+
{
187+
status = statusLabel;
188+
Logger.LogWarning(ex, "Provider error in {Operation}", operation);
189+
}
190+
catch (Exception ex)
191+
{
192+
status = "EXCEPTION";
193+
Logger.LogWarning(ex, "Unhandled error in {Operation}", operation);
194+
}
195+
finally
196+
{
197+
sw.Stop();
198+
Requests.WithLabels(operation, status).Inc();
199+
Duration.WithLabels(operation, status).Observe(sw.Elapsed.TotalSeconds);
200+
}
201+
}
202+
}
203+
204+
async Task ReadOnceAsync(CancellationToken token)
205+
{
206+
var currentMax = Math.Max(1, Volatile.Read(ref nextWriteId));
207+
var id = Random.Shared.Next(1, currentMax + 1);
208+
var guid = MakeGuidFromInt(id);
209+
210+
_ = await db.GetTable<SloTable>()
211+
.Where(t => t.Guid == guid && t.Id == id)
212+
.FirstOrDefaultAsync(token);
213+
}
214+
215+
async Task WriteOnceAsync(CancellationToken token)
216+
{
217+
var id = Interlocked.Increment(ref nextWriteId);
218+
var entity = new SloTable
219+
{
220+
Guid = MakeGuidFromInt(id),
221+
Id = id,
222+
PayloadStr = $"write-{id}",
223+
PayloadDouble = id * 1.0,
224+
PayloadTimestamp = DateTime.UtcNow
225+
};
226+
227+
await db.InsertAsync(entity, token: token);
228+
}
229+
}
230+
231+
private static MappingSchema CreateMapping()
232+
{
233+
var ms = new MappingSchema();
234+
var fb = new FluentMappingBuilder(ms);
235+
236+
fb.Entity<SloTable>()
237+
.HasTableName(SloTable.Name)
238+
.Property(e => e.Guid).IsPrimaryKey().IsNullable(false)
239+
.Property(e => e.Id).IsPrimaryKey().IsNullable(false)
240+
.Property(e => e.PayloadStr).IsNullable(false)
241+
.Property(e => e.PayloadDouble).IsNullable(false)
242+
.Property(e => e.PayloadTimestamp).IsNullable(false);
243+
244+
return ms;
245+
}
246+
247+
private static async Task EnsureTableAsync(DataConnection db)
248+
{
249+
try { await db.ExecuteAsync($"DROP TABLE {SloTable.Name};"); } catch { /* ignore */ }
250+
251+
var create = $@"
252+
CREATE TABLE {SloTable.Name} (
253+
Guid Uuid,
254+
Id Int32,
255+
PayloadStr Utf8,
256+
PayloadDouble Double,
257+
PayloadTimestamp Timestamp,
258+
PRIMARY KEY (Guid, Id)
259+
);";
260+
261+
await db.ExecuteAsync(create);
262+
263+
foreach (var stmt in Internal.SloTable.Options.Split(';', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries))
264+
await db.ExecuteAsync(stmt + ";");
265+
}
266+
267+
private static Guid MakeGuidFromInt(int id)
268+
{
269+
Span<byte> intBytes = stackalloc byte[4];
270+
BitConverter.TryWriteBytes(intBytes, id);
271+
var hash = SHA1.HashData(intBytes);
272+
Span<byte> guidBytes = stackalloc byte[16];
273+
hash.AsSpan(0,16).CopyTo(guidBytes);
274+
return new Guid(guidBytes);
275+
}
276+
277+
private static bool TryExtractStatusLabel(Exception ex, out string label)
278+
{
279+
label = "";
280+
for (var e = ex; e != null; e = e.InnerException!)
281+
{
282+
var prop = e.GetType().GetProperty("StatusCode");
283+
if (prop != null && prop.PropertyType.IsEnum)
284+
{
285+
var val = prop.GetValue(e);
286+
var typeName = prop.PropertyType.FullName ?? prop.PropertyType.Name;
287+
if (typeName.Contains("Ydb", StringComparison.OrdinalIgnoreCase))
288+
{
289+
label = $"YDB_{val}";
290+
return true;
291+
}
292+
if (typeName.Contains("Grpc", StringComparison.OrdinalIgnoreCase))
293+
{
294+
label = $"GRPC_{val}";
295+
return true;
296+
}
297+
label = $"STATUS_{val}";
298+
return true;
299+
}
300+
}
301+
return false;
302+
}
303+
304+
private static IDataProvider ResolveYdbProvider()
305+
{
306+
var asms = AppDomain.CurrentDomain.GetAssemblies();
307+
foreach (var asm in asms)
308+
{
309+
foreach (var t in asm.GetTypes())
310+
{
311+
if (typeof(IDataProvider).IsAssignableFrom(t) && !t.IsAbstract && !t.IsInterface)
312+
{
313+
var name = t.FullName ?? t.Name;
314+
if (name.Contains("Ydb", StringComparison.OrdinalIgnoreCase) ||
315+
name.Contains("YDB", StringComparison.OrdinalIgnoreCase))
316+
{
317+
return (IDataProvider)Activator.CreateInstance(t)!;
318+
}
319+
}
320+
}
321+
}
322+
throw new InvalidOperationException("YDB IDataProvider not found. Ensure your Linq2DB YDB provider assembly is referenced.");
323+
}
324+
}

slo/src/src.sln

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EF", "EF\EF.csproj", "{291A
1313
EndProject
1414
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AdoNet.Dapper", "Dapper\AdoNet.Dapper.csproj", "{A6B9B4F1-4C7C-42C1-A212-B71A9B0D67F7}"
1515
EndProject
16+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Linq2db", "Linq2db\Linq2db.csproj", "{A0AB76CF-A89C-43DE-99C8-0E1C3E539F98}"
17+
EndProject
1618
Global
1719
GlobalSection(SolutionConfigurationPlatforms) = preSolution
1820
Debug|Any CPU = Debug|Any CPU
@@ -39,6 +41,10 @@ Global
3941
{A6B9B4F1-4C7C-42C1-A212-B71A9B0D67F7}.Debug|Any CPU.Build.0 = Debug|Any CPU
4042
{A6B9B4F1-4C7C-42C1-A212-B71A9B0D67F7}.Release|Any CPU.ActiveCfg = Release|Any CPU
4143
{A6B9B4F1-4C7C-42C1-A212-B71A9B0D67F7}.Release|Any CPU.Build.0 = Release|Any CPU
44+
{A0AB76CF-A89C-43DE-99C8-0E1C3E539F98}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
45+
{A0AB76CF-A89C-43DE-99C8-0E1C3E539F98}.Debug|Any CPU.Build.0 = Debug|Any CPU
46+
{A0AB76CF-A89C-43DE-99C8-0E1C3E539F98}.Release|Any CPU.ActiveCfg = Release|Any CPU
47+
{A0AB76CF-A89C-43DE-99C8-0E1C3E539F98}.Release|Any CPU.Build.0 = Release|Any CPU
4248
EndGlobalSection
4349
GlobalSection(SolutionProperties) = preSolution
4450
HideSolutionNode = FALSE

0 commit comments

Comments
 (0)