@@ -47,12 +47,11 @@ await topicClient.CreateTopic(
4747
4848 public async Task Run(RunConfig config)
4949 {
50+ Logger.LogInformation("Started Run topic slo test");
5051 var driver = await Driver.CreateInitialized(
5152 new DriverConfig(config.Endpoint, config.Db), ISloContext.Factory);
52- var promPgwEndpoint = $"{config.PromPgw}/metrics";
53- using var prometheus = new MetricPusher(promPgwEndpoint, "workload-" + Job,
54- intervalMilliseconds: config.ReportPeriod);
55- prometheus.Start();
53+
54+ Logger.LogInformation("Driver is initialized!");
5655
5756 var writeLimiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions
5857 {
@@ -67,53 +66,12 @@ public async Task Run(RunConfig config)
6766 var writeTasks = new List<Task>();
6867 for (var i = 0; i < PartitionSize; i++)
6968 {
70- const string operationType = "write";
7169 var producer = "producer-" + (i + 1);
7270 messageSending[producer] = new ConcurrentQueue<string>();
7371
7472 writeTasks.Add(
7573 Task.Run(async () =>
7674 {
77- var metricFactory = Metrics.WithLabels(new Dictionary<string, string>
78- {
79- { "operation_type", operationType },
80- { "sdk", "dotnet" },
81- { "sdk_version", Environment.Version.ToString() },
82- { "workload", Job },
83- { "workload_version", "0.0.0" }
84- }
85- );
86-
87- var operationsSuccessTotal = metricFactory.CreateCounter(
88- "sdk_operations_success_total",
89- "Total number of successful operations, categorized by type."
90- );
91-
92- var operationLatencySeconds = metricFactory.CreateHistogram(
93- "sdk_operation_latency_seconds",
94- "Latency of operations performed by the SDK in seconds, categorized by type and status.",
95- ["operation_status"],
96- new HistogramConfiguration
97- {
98- Buckets =
99- [
100- 0.001, // 1 ms
101- 0.002, // 2 ms
102- 0.003, // 3 ms
103- 0.004, // 4 ms
104- 0.005, // 5 ms
105- 0.0075, // 7.5 ms
106- 0.010, // 10 ms
107- 0.020, // 20 ms
108- 0.050, // 50 ms
109- 0.100, // 100 ms
110- 0.200, // 200 ms
111- 0.500, // 500 ms
112- 1.000 // 1 s
113- ]
114- }
115- );
116-
11775 using var writer = new WriterBuilder<string>(driver, PathTopic)
11876 {
11977 ProducerId = producer,
@@ -142,13 +100,8 @@ public async Task Run(RunConfig config)
142100 var data = textBuilder.ToString();
143101 messageSending[producer].Enqueue(data);
144102
145- var sw = Stopwatch.StartNew();
146103 // ReSharper disable once MethodSupportsCancellation
147104 await writer.WriteAsync(data);
148- sw.Stop();
149-
150- operationsSuccessTotal.Inc();
151- operationLatencySeconds.WithLabels("success").Observe(sw.Elapsed.TotalSeconds);
152105 }
153106 }, cts.Token)
154107 );
@@ -158,25 +111,9 @@ public async Task Run(RunConfig config)
158111 for (var i = 0; i < PartitionSize; i++)
159112 {
160113 var handlerBatch = i % 2 == 0;
161- const string operationType = "read";
162114 var number = i;
163115 readTasks.Add(Task.Run(async () =>
164116 {
165- var metricFactory = Metrics.WithLabels(new Dictionary<string, string>
166- {
167- { "operation_type", operationType },
168- { "sdk", "dotnet" },
169- { "sdk_version", Environment.Version.ToString() },
170- { "workload", Job },
171- { "workload_version", "0.0.0" }
172- }
173- );
174-
175- var operationsSuccessTotal = metricFactory.CreateCounter(
176- "sdk_operations_success_total",
177- "Total number of successful operations, categorized by type."
178- );
179-
180117 using var reader = new ReaderBuilder<string>(driver)
181118 {
182119 ConsumerName = ConsumerName,
@@ -191,11 +128,11 @@ public async Task Run(RunConfig config)
191128
192129 if (handlerBatch)
193130 {
194- await ReadBatchMessages(cts, reader, messageSending, operationsSuccessTotal );
131+ await ReadBatchMessages(cts, reader, messageSending);
195132 }
196133 else
197134 {
198- await ReadMessage(cts, reader, messageSending, operationsSuccessTotal );
135+ await ReadMessage(cts, reader, messageSending);
199136 }
200137 }, cts.Token));
201138 }
@@ -209,22 +146,18 @@ public async Task Run(RunConfig config)
209146 {
210147 }
211148
212- await prometheus.StopAsync();
213-
214149 Logger.LogInformation("Task finish!");
215150 }
216151
217152 private static async Task ReadBatchMessages(
218153 CancellationTokenSource cts,
219154 IReader<string> reader,
220- ConcurrentDictionary<string, ConcurrentQueue<string>> localStore,
221- Counter messageCounter
155+ ConcurrentDictionary<string, ConcurrentQueue<string>> localStore
222156 )
223157 {
224158 while (!cts.IsCancellationRequested)
225159 {
226160 var batchMessages = await reader.ReadBatchAsync(cts.Token);
227- messageCounter.Inc(batchMessages.Batch.Count);
228161
229162 foreach (var message in batchMessages.Batch)
230163 {
@@ -236,14 +169,12 @@ Counter messageCounter
236169 private static async Task ReadMessage(
237170 CancellationTokenSource cts,
238171 IReader<string> reader,
239- ConcurrentDictionary<string, ConcurrentQueue<string>> localStore,
240- Counter messageCounter
172+ ConcurrentDictionary<string, ConcurrentQueue<string>> localStore
241173 )
242174 {
243175 while (!cts.IsCancellationRequested)
244176 {
245177 CheckMessage(localStore, await reader.ReadAsync(cts.Token));
246- messageCounter.Inc();
247178 }
248179 }
249180
0 commit comments