1+ using System . Data ;
12using Internal ;
23using Microsoft . Extensions . Logging ;
34using Polly ;
45using Prometheus ;
56using Ydb . Sdk ;
67using Ydb . Sdk . Ado ;
7- using Ydb . Sdk . Value ;
88
99namespace AdoNet ;
1010
@@ -22,17 +22,36 @@ public class SloTableContext : SloTableContext<YdbDataSource>
2222
2323 protected override string Job => "AdoNet" ;
2424
25- protected override async Task Create ( YdbDataSource client , string createTableSql , int operationTimeout )
25+ protected override YdbDataSource CreateClient ( Config config ) => new (
26+ new YdbConnectionStringBuilder ( config . ConnectionString ) { LoggerFactory = ISloContext . Factory }
27+ ) ;
28+
29+ protected override async Task Create ( YdbDataSource client , int operationTimeout )
2630 {
2731 await using var ydbConnection = await client . OpenConnectionAsync ( ) ;
28-
2932 await new YdbCommand ( ydbConnection )
30- { CommandText = createTableSql , CommandTimeout = operationTimeout }
31- . ExecuteNonQueryAsync ( ) ;
33+ {
34+ CommandText = $ """
35+ CREATE TABLE `{ SloTable . Name } ` (
36+ Guid UUID,
37+ Id Int32,
38+ PayloadStr Text,
39+ PayloadDouble Double,
40+ PayloadTimestamp Timestamp,
41+ PRIMARY KEY (Guid, Id)
42+ );
43+ { SloTable . Options }
44+ """ ,
45+ CommandTimeout = operationTimeout
46+ } . ExecuteNonQueryAsync ( ) ;
3247 }
3348
34- protected override async Task < ( int , StatusCode ) > Upsert ( YdbDataSource dataSource , string upsertSql ,
35- Dictionary < string , YdbValue > parameters , int writeTimeout , Counter ? errorsTotal = null )
49+ protected override async Task < ( int , StatusCode ) > Save (
50+ YdbDataSource client ,
51+ SloTable sloTable ,
52+ int writeTimeout ,
53+ Counter ? errorsTotal = null
54+ )
3655 {
3756 var context = new Context ( ) ;
3857 if ( errorsTotal != null )
@@ -42,15 +61,49 @@ protected override async Task Create(YdbDataSource client, string createTableSql
4261
4362 var policyResult = await _policy . ExecuteAndCaptureAsync ( async _ =>
4463 {
45- await using var ydbConnection = await dataSource . OpenConnectionAsync ( ) ;
64+ await using var ydbConnection = await client . OpenConnectionAsync ( ) ;
4665
4766 var ydbCommand = new YdbCommand ( ydbConnection )
48- { CommandText = upsertSql , CommandTimeout = writeTimeout } ;
49-
50- foreach ( var ( key , value ) in parameters )
5167 {
52- ydbCommand . Parameters . AddWithValue ( key , value ) ;
53- }
68+ CommandText = $ """
69+ INSERT INTO `{ SloTable . Name } ` (Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp)
70+ VALUES (@Guid, @Id, @PayloadStr, @PayloadDouble, @PayloadTimestamp)
71+ """ ,
72+ CommandTimeout = writeTimeout ,
73+ Parameters =
74+ {
75+ new YdbParameter
76+ {
77+ DbType = DbType . Guid ,
78+ ParameterName = "Guid" ,
79+ Value = sloTable . Guid
80+ } ,
81+ new YdbParameter
82+ {
83+ DbType = DbType . Int32 ,
84+ ParameterName = "Id" ,
85+ Value = sloTable . Id
86+ } ,
87+ new YdbParameter
88+ {
89+ DbType = DbType . String ,
90+ ParameterName = "PayloadStr" ,
91+ Value = sloTable . PayloadStr
92+ } ,
93+ new YdbParameter
94+ {
95+ DbType = DbType . Double ,
96+ ParameterName = "PayloadDouble" ,
97+ Value = sloTable . PayloadDouble
98+ } ,
99+ new YdbParameter
100+ {
101+ DbType = DbType . DateTime2 ,
102+ ParameterName = "PayloadTimestamp" ,
103+ Value = sloTable . PayloadTimestamp
104+ }
105+ }
106+ } ;
54107
55108 await ydbCommand . ExecuteNonQueryAsync ( ) ;
56109 } , context ) ;
@@ -60,8 +113,12 @@ protected override async Task Create(YdbDataSource client, string createTableSql
60113 ( ( YdbException ) policyResult . FinalException ) ? . Code ?? StatusCode . Success ) ;
61114 }
62115
63- protected override async Task < ( int , StatusCode , object ? ) > Select ( YdbDataSource dataSource , string selectSql ,
64- Dictionary < string , YdbValue > parameters , int readTimeout , Counter ? errorsTotal = null )
116+ protected override async Task < ( int , StatusCode , object ? ) > Select (
117+ YdbDataSource client ,
118+ ( Guid Guid , int Id ) select ,
119+ int readTimeout ,
120+ Counter ? errorsTotal = null
121+ )
65122 {
66123 var context = new Context ( ) ;
67124 if ( errorsTotal != null )
@@ -73,39 +130,33 @@ protected override async Task Create(YdbDataSource client, string createTableSql
73130 var policyResult = await _policy . ExecuteAndCaptureAsync ( async _ =>
74131 {
75132 attempts ++ ;
76- await using var ydbConnection = await dataSource . OpenConnectionAsync ( ) ;
133+ await using var ydbConnection = await client . OpenConnectionAsync ( ) ;
77134
78135 var ydbCommand = new YdbCommand ( ydbConnection )
79- { CommandText = selectSql , CommandTimeout = readTimeout } ;
80-
81- foreach ( var ( key , value ) in parameters )
82136 {
83- ydbCommand . Parameters . AddWithValue ( key , value ) ;
84- }
137+ CommandText = $ """
138+ SELECT Guid, Id, PayloadStr, PayloadDouble, PayloadTimestamp
139+ FROM `{ SloTable . Name } ` WHERE Guid = @Guid AND Id = @Id;
140+ """ ,
141+ CommandTimeout = readTimeout ,
142+ Parameters =
143+ {
144+ new YdbParameter { ParameterName = "Guid" , DbType = DbType . Guid , Value = select . Guid } ,
145+ new YdbParameter { ParameterName = "Id" , DbType = DbType . Int32 , Value = select . Id }
146+ }
147+ } ;
85148
86149 return await ydbCommand . ExecuteScalarAsync ( ) ;
87150 } , context ) ;
88151
89152 return ( attempts , ( ( YdbException ) policyResult . FinalException ) ? . Code ?? StatusCode . Success , policyResult . Result ) ;
90153 }
91154
92- protected override Task < YdbDataSource > CreateClient ( Config config )
155+ protected override async Task < int > SelectCount ( YdbDataSource client )
93156 {
94- var splitEndpoint = config . Endpoint . Split ( "://" ) ;
95- var useTls = splitEndpoint [ 0 ] switch
96- {
97- "grpc" => false ,
98- "grpcs" => true ,
99- _ => throw new ArgumentException ( "Don't support schema: " + splitEndpoint [ 0 ] )
100- } ;
101-
102- var host = splitEndpoint [ 1 ] . Split ( ":" ) [ 0 ] ;
103- var port = splitEndpoint [ 1 ] . Split ( ":" ) [ 1 ] ;
157+ await using var ydbConnection = await client . OpenConnectionAsync ( ) ;
104158
105- return Task . FromResult ( new YdbDataSource ( new YdbConnectionStringBuilder
106- {
107- UseTls = useTls , Host = host , Port = int . Parse ( port ) , Database = config . Db ,
108- LoggerFactory = ISloContext . Factory
109- } ) ) ;
159+ return ( int ) ( await new YdbCommand ( ydbConnection ) { CommandText = $ "SELECT MAX(Id) FROM { SloTable . Name } " }
160+ . ExecuteScalarAsync ( ) ) ! ;
110161 }
111162}
0 commit comments