Skip to content

Commit f5edd58

Browse files
committed
using ydb.sdk retry
1 parent 4ee4209 commit f5edd58

File tree

4 files changed

+225
-177
lines changed

4 files changed

+225
-177
lines changed

slo/src/Linq2db.Slo/Program.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1-
using Internal;
1+
using Linq2db;
2+
using Internal;
23

3-
await Cli.Run(new Linq2db.SloTableContext(), args);
4+
await Cli.Run(new SloTableContext(), args);
Lines changed: 58 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,28 @@
1-
using Internal;
1+
using LinqToDB;
22
using LinqToDB.Data;
3+
using LinqToDB.Mapping;
4+
using Internal;
5+
using LinqToDB.Async;
6+
using LinqToDB.Internal.DataProvider.Ydb.Internal;
37

48
namespace Linq2db;
59

610
public sealed class SloTableContext : SloTableContext<SloTableContext.Linq2dbClient>
711
{
812
protected override string Job => "Linq2DB";
913

14+
// ВКЛЮЧАЕМ ретраи SDK глобально для всех DataConnection
15+
static SloTableContext()
16+
{
17+
YdbSdkRetryPolicyRegistration.UseGloballyWithIdempotence(
18+
maxAttempts: 10,
19+
onRetry: (attempt, ex, delay) =>
20+
{
21+
// здесь при желании снимать метрики: attempt, delay, ((YdbException)ex).Code
22+
}
23+
);
24+
}
25+
1026
public sealed class Linq2dbClient
1127
{
1228
private readonly string _connectionString;
@@ -16,7 +32,8 @@ public Linq2dbClient(string connectionString)
1632
_connectionString = connectionString;
1733
}
1834

19-
public DataConnection Open() => new DataConnection("YDB", _connectionString);
35+
public DataConnection Open()
36+
=> new DataConnection("YDB", _connectionString);
2037
}
2138

2239
protected override Linq2dbClient CreateClient(Config config)
@@ -27,54 +44,64 @@ protected override async Task Create(Linq2dbClient client, int operationTimeout)
2744
await using var db = client.Open();
2845
db.CommandTimeout = operationTimeout;
2946

30-
// 1) CREATE TABLE
31-
await db.ExecuteAsync($@"
32-
CREATE TABLE `{SloTable.Name}` (
33-
Guid Uuid,
34-
Id Int32,
35-
PayloadStr Text,
36-
PayloadDouble Double,
37-
PayloadTimestamp Timestamp,
38-
PRIMARY KEY (Guid, Id)
39-
);
40-
");
41-
42-
await db.ExecuteAsync(SloTable.Options);
47+
try
48+
{
49+
await db.ExecuteAsync($@"
50+
CREATE TABLE `{SloTable.Name}` (
51+
Guid Uuid,
52+
Id Int32,
53+
PayloadStr Utf8,
54+
PayloadDouble Double,
55+
PayloadTimestamp Timestamp,
56+
PRIMARY KEY (Guid, Id)
57+
)");
58+
}
59+
catch
60+
{
61+
// YDB не поддерживает IF NOT EXISTS; если таблица есть — окей
62+
}
63+
64+
if (!string.IsNullOrWhiteSpace(SloTable.Options))
65+
{
66+
await db.ExecuteAsync(SloTable.Options);
67+
}
4368
}
4469

45-
protected override async Task<int> Save(Linq2dbClient client, SloTable row, int writeTimeout)
70+
protected override async Task<int> Save(Linq2dbClient client, SloTable sloTable, int writeTimeout)
4671
{
4772
await using var db = client.Open();
4873
db.CommandTimeout = writeTimeout;
4974

5075
await db.ExecuteAsync($@"
51-
UPSERT INTO `{SloTable.Name}` (Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp)
52-
VALUES ({row.Guid}, {row.Id}, {row.PayloadStr}, {row.PayloadDouble}, {row.PayloadTimestamp});
53-
");
76+
UPSERT INTO `{SloTable.Name}` (Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp)
77+
VALUES ({sloTable.Guid}, {sloTable.Id}, {sloTable.PayloadStr}, {sloTable.PayloadDouble}, {sloTable.PayloadTimestamp});
78+
");
5479

55-
return 1;
80+
return 0;
5681
}
5782

58-
protected override async Task<object?> Select(Linq2dbClient client, (Guid Guid, int Id) key, int readTimeout)
83+
protected override async Task<object?> Select(Linq2dbClient client, (Guid Guid, int Id) select, int readTimeout)
5984
{
6085
await using var db = client.Open();
6186
db.CommandTimeout = readTimeout;
6287

63-
var exists = await db.ExecuteAsync<int>($@"
64-
SELECT COUNT(*) FROM `{SloTable.Name}` WHERE Guid = {key.Guid} AND Id = {key.Id};
65-
");
66-
67-
return exists > 0 ? 1 : null;
88+
var t = db.GetTable<SloRow>();
89+
return await t.FirstOrDefaultAsync(r => r.Guid == select.Guid && r.Id == select.Id);
6890
}
6991

7092
protected override async Task<int> SelectCount(Linq2dbClient client)
7193
{
7294
await using var db = client.Open();
95+
return await db.GetTable<SloRow>().CountAsync();
96+
}
7397

74-
var maxId = await db.ExecuteAsync<int?>($@"
75-
SELECT MAX(Id) FROM `{SloTable.Name}`;
76-
");
77-
78-
return maxId ?? 0;
98+
[Table(SloTable.Name)]
99+
private sealed class SloRow
100+
{
101+
[Column] public Guid Guid { get; set; }
102+
[Column] public int Id { get; set; }
103+
[Column] public string? PayloadStr { get; set; }
104+
[Column] public double PayloadDouble { get; set; }
105+
[Column] public DateTime PayloadTimestamp { get; set; }
79106
}
80107
}

src/Linq2db.Ydb/src/Internal/YdbRetryPolicy.cs

Lines changed: 0 additions & 144 deletions
This file was deleted.

0 commit comments

Comments
 (0)