1+ using Dapper ;
2+ using Internal ;
3+ using Microsoft . Extensions . Logging ;
4+ using Polly ;
5+ using Ydb . Sdk ;
6+ using Ydb . Sdk . Ado ;
7+
8+ namespace AdoNet . Dapper ;
9+
10+ public class SloTableContext : SloTableContext < YdbDataSource >
11+ {
12+ private static readonly AsyncPolicy Policy = Polly . Policy . Handle < YdbException > ( exception => exception . IsTransient )
13+ . WaitAndRetryAsync ( 10 , attempt => TimeSpan . FromMilliseconds ( attempt * 10 ) ,
14+ ( e , _ , _ , _ ) => { Logger . LogWarning ( e , "Failed read / write operation" ) ; } ) ;
15+
16+ protected override string Job => "Dapper" ;
17+
18+ protected override YdbDataSource CreateClient ( Config config ) => new (
19+ new YdbConnectionStringBuilder ( config . ConnectionString ) { LoggerFactory = ISloContext . Factory }
20+ ) ;
21+
22+ protected override async Task Create ( YdbDataSource client , int operationTimeout )
23+ {
24+ await using var connection = await client . OpenConnectionAsync ( ) ;
25+ await connection . ExecuteAsync ( $ """
26+ CREATE TABLE `{ SloTable . Name } ` (
27+ Guid Uuid,
28+ Id Int32,
29+ PayloadStr Text,
30+ PayloadDouble Double,
31+ PayloadTimestamp Timestamp,
32+ PRIMARY KEY (Guid, Id)
33+ );
34+ { SloTable . Options }
35+ """ ) ;
36+ }
37+
38+ protected override async Task < ( int , StatusCode ) > Save ( YdbDataSource client , SloTable sloTable , int writeTimeout )
39+ {
40+ var attempt = 0 ;
41+ var policyResult = await Policy . ExecuteAndCaptureAsync ( async _ =>
42+ {
43+ attempt ++ ;
44+ await using var connection = await client . OpenConnectionAsync ( ) ;
45+ await connection . ExecuteAsync ( $ """
46+ UPSERT INTO `{ SloTable . Name } ` (Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp)
47+ VALUES (@Guid, @Id, @PayloadStr, @PayloadDouble, @PayloadTimestamp)
48+ """ , sloTable ) ;
49+ } , new Context ( )
50+ ) ;
51+
52+ return ( attempt , ( ( YdbException ) policyResult . FinalException ) ? . Code ?? StatusCode . Success ) ;
53+ }
54+
55+ protected override async Task < ( int , StatusCode , object ? ) > Select ( YdbDataSource client , ( Guid Guid , int Id ) select ,
56+ int readTimeout )
57+ {
58+ var attempts = 0 ;
59+ var policyResult = await Policy . ExecuteAndCaptureAsync ( async _ =>
60+ {
61+ attempts ++ ;
62+ await using var connection = await client . OpenConnectionAsync ( ) ;
63+
64+ return await connection . QueryFirstOrDefaultAsync < SloTable > (
65+ $ """
66+ SELECT Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp
67+ FROM `{ SloTable . Name } ` WHERE Guid = @Guid AND Id = @Id;
68+ """ ,
69+ new { select . Guid , select . Id }
70+ ) ;
71+ } , new Context ( ) ) ;
72+
73+ return ( attempts , ( ( YdbException ) policyResult . FinalException ) ? . Code ?? StatusCode . Success , policyResult . Result ) ;
74+ }
75+
76+ protected override async Task < int > SelectCount ( YdbDataSource client )
77+ {
78+ await using var connection = await client . OpenConnectionAsync ( ) ;
79+ return await connection . ExecuteScalarAsync < int > ( $ "SELECT MAX(Id) FROM { SloTable . Name } ") ;
80+ }
81+ }
0 commit comments