11using System . Collections . Concurrent ;
2+ using System . Diagnostics ;
23using System . Text ;
4+ using System . Threading . RateLimiting ;
35using Internal ;
46using Microsoft . Extensions . Logging ;
7+ using Prometheus ;
58using Ydb . Sdk ;
69using Ydb . Sdk . Services . Topic ;
710using Ydb . Sdk . Services . Topic . Reader ;
@@ -11,6 +14,7 @@ namespace TopicService;
1114
1215public class SloTopicContext : ISloContext
1316{
17+ private const string Job = "TopicService" ;
1418 private const string PathTopic = "/Root/testdb/slo-topic" ;
1519 private const string ConsumerName = "Consumer" ;
1620 private const int PartitionSize = 10 ;
@@ -45,6 +49,15 @@ public async Task Run(RunConfig config)
4549 {
4650 var driver = await Driver . CreateInitialized (
4751 new DriverConfig ( config . Endpoint , config . Db ) , ISloContext . Factory ) ;
52+ var promPgwEndpoint = $ "{ config . PromPgw } /metrics";
53+ using var prometheus = new MetricPusher ( promPgwEndpoint , "workload-" + Job ,
54+ intervalMilliseconds : config . ReportPeriod ) ;
55+ prometheus . Start ( ) ;
56+
57+ var writeLimiter = new FixedWindowRateLimiter ( new FixedWindowRateLimiterOptions
58+ {
59+ Window = TimeSpan . FromMilliseconds ( 100 ) , PermitLimit = config . WriteRps / 10 , QueueLimit = int . MaxValue
60+ } ) ;
4861
4962 var cts = new CancellationTokenSource ( ) ;
5063 cts . CancelAfter ( TimeSpan . FromSeconds ( config . Time ) ) ;
@@ -54,38 +67,88 @@ public async Task Run(RunConfig config)
5467 var writeTasks = new List < Task > ( ) ;
5568 for ( var i = 0 ; i < PartitionSize ; i ++ )
5669 {
70+ const string operationType = "write" ;
5771 var producer = "producer-" + ( i + 1 ) ;
5872 messageSending [ producer ] = new ConcurrentQueue < string > ( ) ;
5973
6074 writeTasks . Add (
6175 Task . Run ( async ( ) =>
6276 {
77+ var metricFactory = Metrics . WithLabels ( new Dictionary < string , string >
78+ {
79+ { "operation_type" , operationType } ,
80+ { "sdk" , "dotnet" } ,
81+ { "sdk_version" , Environment . Version . ToString ( ) } ,
82+ { "workload" , Job } ,
83+ { "workload_version" , "0.0.0" }
84+ }
85+ ) ;
86+
87+ var operationsSuccessTotal = metricFactory . CreateCounter (
88+ "sdk_operations_success_total" ,
89+ "Total number of successful operations, categorized by type."
90+ ) ;
91+
92+ var operationLatencySeconds = metricFactory . CreateHistogram (
93+ "sdk_operation_latency_seconds" ,
94+ "Latency of operations performed by the SDK in seconds, categorized by type and status." ,
95+ [ "operation_status" ] ,
96+ new HistogramConfiguration
97+ {
98+ Buckets =
99+ [
100+ 0.001 , // 1 ms
101+ 0.002 , // 2 ms
102+ 0.003 , // 3 ms
103+ 0.004 , // 4 ms
104+ 0.005 , // 5 ms
105+ 0.0075 , // 7.5 ms
106+ 0.010 , // 10 ms
107+ 0.020 , // 20 ms
108+ 0.050 , // 50 ms
109+ 0.100 , // 100 ms
110+ 0.200 , // 200 ms
111+ 0.500 , // 500 ms
112+ 1.000 // 1 s
113+ ]
114+ }
115+ ) ;
116+
63117 using var writer = new WriterBuilder < string > ( driver , PathTopic )
64118 {
65119 ProducerId = producer ,
66120 BufferMaxSize = 8 * 1024 * 1024
67121 } . Build ( ) ;
68122
123+ Logger . LogInformation ( "Started Writer[ProducerId={ProducerId}]" , producer ) ;
124+
69125 while ( ! cts . IsCancellationRequested )
70126 {
71- var tasks = new List < Task > ( ) ;
72- for ( var k = 0 ; k < 10 ; k ++ )
127+ using var lease = await writeLimiter . AcquireAsync ( cancellationToken : cts . Token ) ;
128+
129+ if ( ! lease . IsAcquired )
130+ {
131+ continue ;
132+ }
133+
134+ var textBuilder = new StringBuilder ( ) ;
135+
136+ var size = Random . Shared . Next ( 1 , 100 ) ;
137+ for ( var j = 0 ; j < size ; j ++ )
73138 {
74- var textBuilder = new StringBuilder ( ) ;
75-
76- var size = Random . Shared . Next ( 1 , 100 ) ;
77- for ( var j = 0 ; j < size ; j ++ )
78- {
79- textBuilder . Append ( Guid . NewGuid ( ) ) ;
80- }
81-
82- var data = textBuilder . ToString ( ) ;
83- messageSending [ producer ] . Enqueue ( data ) ;
84- // ReSharper disable once MethodSupportsCancellation
85- tasks . Add ( writer . WriteAsync ( data ) ) ;
139+ textBuilder . Append ( Guid . NewGuid ( ) ) ;
86140 }
87141
88- await Task . WhenAll ( tasks ) ;
142+ var data = textBuilder . ToString ( ) ;
143+ messageSending [ producer ] . Enqueue ( data ) ;
144+
145+ var sw = Stopwatch . StartNew ( ) ;
146+ // ReSharper disable once MethodSupportsCancellation
147+ await writer . WriteAsync ( data ) ;
148+ sw . Stop ( ) ;
149+
150+ operationsSuccessTotal . Inc ( ) ;
151+ operationLatencySeconds . WithLabels ( "success" ) . Observe ( sw . Elapsed . TotalSeconds ) ;
89152 }
90153 } , cts . Token )
91154 ) ;
@@ -95,9 +158,25 @@ public async Task Run(RunConfig config)
95158 for ( var i = 0 ; i < PartitionSize ; i ++ )
96159 {
97160 var handlerBatch = i % 2 == 0 ;
98-
161+ const string operationType = "read" ;
162+ var number = i ;
99163 readTasks . Add ( Task . Run ( async ( ) =>
100164 {
165+ var metricFactory = Metrics . WithLabels ( new Dictionary < string , string >
166+ {
167+ { "operation_type" , operationType } ,
168+ { "sdk" , "dotnet" } ,
169+ { "sdk_version" , Environment . Version . ToString ( ) } ,
170+ { "workload" , Job } ,
171+ { "workload_version" , "0.0.0" }
172+ }
173+ ) ;
174+
175+ var operationsSuccessTotal = metricFactory . CreateCounter (
176+ "sdk_operations_success_total" ,
177+ "Total number of successful operations, categorized by type."
178+ ) ;
179+
101180 using var reader = new ReaderBuilder < string > ( driver )
102181 {
103182 ConsumerName = ConsumerName ,
@@ -108,30 +187,44 @@ public async Task Run(RunConfig config)
108187 MemoryUsageMaxBytes = 8 * 1024 * 1024 ,
109188 } . Build ( ) ;
110189
190+ Logger . LogInformation ( "Started Reader[{Number}]" , number ) ;
191+
111192 if ( handlerBatch )
112193 {
113- await ReadBatchMessages ( cts , reader , messageSending ) ;
194+ await ReadBatchMessages ( cts , reader , messageSending , operationsSuccessTotal ) ;
114195 }
115196 else
116197 {
117- await ReadMessage ( cts , reader , messageSending ) ;
198+ await ReadMessage ( cts , reader , messageSending , operationsSuccessTotal ) ;
118199 }
119200 } , cts . Token ) ) ;
120201 }
121202
122- await Task . WhenAll ( writeTasks ) ;
123- await Task . WhenAll ( readTasks ) ;
203+ try
204+ {
205+ await Task . WhenAll ( writeTasks ) ;
206+ await Task . WhenAll ( readTasks ) ;
207+ }
208+ catch ( OperationCanceledException )
209+ {
210+ }
211+
212+ await prometheus . StopAsync ( ) ;
213+
214+ Logger . LogInformation ( "Task finish!" ) ;
124215 }
125216
126217 private static async Task ReadBatchMessages (
127218 CancellationTokenSource cts ,
128219 IReader < string > reader ,
129- ConcurrentDictionary < string , ConcurrentQueue < string > > localStore
220+ ConcurrentDictionary < string , ConcurrentQueue < string > > localStore ,
221+ Counter messageCounter
130222 )
131223 {
132224 while ( ! cts . IsCancellationRequested )
133225 {
134- var batchMessages = await reader . ReadBatchAsync ( ) ;
226+ var batchMessages = await reader . ReadBatchAsync ( cts . Token ) ;
227+ messageCounter . Inc ( batchMessages . Batch . Count ) ;
135228
136229 foreach ( var message in batchMessages . Batch )
137230 {
@@ -143,12 +236,14 @@ ConcurrentDictionary<string, ConcurrentQueue<string>> localStore
143236 private static async Task ReadMessage (
144237 CancellationTokenSource cts ,
145238 IReader < string > reader ,
146- ConcurrentDictionary < string , ConcurrentQueue < string > > localStore
239+ ConcurrentDictionary < string , ConcurrentQueue < string > > localStore ,
240+ Counter messageCounter
147241 )
148242 {
149243 while ( ! cts . IsCancellationRequested )
150244 {
151- CheckMessage ( localStore , await reader . ReadAsync ( ) ) ;
245+ CheckMessage ( localStore , await reader . ReadAsync ( cts . Token ) ) ;
246+ messageCounter . Inc ( ) ;
152247 }
153248 }
154249
@@ -159,32 +254,30 @@ private static void CheckMessage(ConcurrentDictionary<string, ConcurrentQueue<st
159254 {
160255 if ( partition . TryDequeue ( out var expectedMessageData ) )
161256 {
162- if ( expectedMessageData ! = message . Data )
257+ if ( expectedMessageData = = message . Data )
163258 {
164- Logger . LogCritical (
165- "Fail assertion messages! expectedData: {ExpectedData}, " +
166- "actualMessage: [Topic: {Topic}, Data: {Data}, ProducerId: {ProducerId}, CreatedAt: {CreatedAt}]" ,
167- expectedMessageData , message . Topic , message . Data , message . ProducerId , message . CreatedAt ) ;
168-
169- throw new Exception ( "FAILED SLO TEST: ASSERT ERROR!" ) ;
259+ return ;
170260 }
171- }
172- else
173- {
261+
174262 Logger . LogCritical (
175- "Unknown message: [Topic: {Topic}, ProducerId: {ProducerId}, CreatedAt: {CreatedAt}]" ,
176- message . Topic , message . ProducerId , message . CreatedAt ) ;
263+ "Fail assertion messages! expectedData: {ExpectedData}, " +
264+ "actualMessage: [Topic: {Topic}, Data: {Data}, ProducerId: {ProducerId}, CreatedAt: {CreatedAt}]" ,
265+ expectedMessageData , message . Topic , message . Data , message . ProducerId , message . CreatedAt ) ;
177266
178- throw new Exception ( "FAILED SLO TEST: UNKNOWN MESSAGE !" ) ;
267+ throw new Exception ( "FAILED SLO TEST: ASSERT ERROR !" ) ;
179268 }
180- }
181- else
182- {
269+
183270 Logger . LogCritical (
184271 "Unknown message: [Topic: {Topic}, ProducerId: {ProducerId}, CreatedAt: {CreatedAt}]" ,
185272 message . Topic , message . ProducerId , message . CreatedAt ) ;
186273
187- throw new Exception ( "FAILED SLO TEST: NOT FOUND PARTITION FOR PRODUCER_ID !" ) ;
274+ throw new Exception ( "FAILED SLO TEST: UNKNOWN MESSAGE !" ) ;
188275 }
276+
277+ Logger . LogCritical (
278+ "Unknown message: [Topic: {Topic}, ProducerId: {ProducerId}, CreatedAt: {CreatedAt}]" ,
279+ message . Topic , message . ProducerId , message . CreatedAt ) ;
280+
281+ throw new Exception ( "FAILED SLO TEST: NOT FOUND PARTITION FOR PRODUCER_ID!" ) ;
189282 }
190283}
0 commit comments