Skip to content

Commit 2beb430

Browse files
committed
WIP
1 parent 6bde141 commit 2beb430

File tree

3 files changed

+130
-53
lines changed

3 files changed

+130
-53
lines changed

src/SeqCli/Cli/Commands/Bench/BenchCaseTimings.cs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,33 @@
1818

1919
namespace SeqCli.Cli.Commands.Bench;
2020

21+
class CollectedTimings
22+
{
23+
List<BenchCaseTimings> _collectedTimings = new();
24+
25+
public static BenchCase FINAL_COUNT_CASE = new BenchCase()
26+
{
27+
Id = "final-count-star",
28+
Query = "select count(*) from stream",
29+
};
30+
31+
public void Add(BenchCaseTimings caseTimings)
32+
{
33+
_collectedTimings.Add(caseTimings);
34+
}
35+
36+
public void LogSummary()
37+
{
38+
throw new NotImplementedException();
39+
}
40+
}
41+
2142
/*
2243
* Collects benchmarking elapsed time measurements and calculates statistics.
2344
*/
2445
class BenchCaseTimings
2546
{
47+
readonly BenchCase _benchCase;
2648
readonly List<double> _timings = new();
2749

2850
public double MeanElapsed => _timings.Sum() / _timings.Count;
@@ -32,6 +54,11 @@ class BenchCaseTimings
3254
public double StandardDeviationElapsed => StandardDeviation(_timings);
3355
public double RelativeStandardDeviationElapsed => StandardDeviation(_timings) / MeanElapsed;
3456

57+
public BenchCaseTimings(BenchCase benchCase)
58+
{
59+
_benchCase = benchCase;
60+
}
61+
3562
public void PushElapsed(double elapsed)
3663
{
3764
_timings.Add(elapsed);

src/SeqCli/Cli/Commands/Bench/BenchCommand.cs

Lines changed: 100 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,17 @@
1313
// limitations under the License.
1414

1515
using System;
16+
using System.Collections.Generic;
1617
using System.IO;
1718
using System.Linq;
18-
using System.Security.Cryptography;
19-
using System.Text;
19+
using System.Threading;
2020
using System.Threading.Tasks;
2121
using Newtonsoft.Json;
22+
using Seq.Api;
2223
using Seq.Api.Model.Signals;
2324
using SeqCli.Cli.Features;
2425
using SeqCli.Connection;
26+
using SeqCli.Sample.Loader;
2527
using SeqCli.Util;
2628
using Serilog;
2729
using Serilog.Context;
@@ -70,6 +72,8 @@ class BenchCommand : Command
7072
string _reportingServerUrl = "";
7173
string _reportingServerApiKey = "";
7274
string _description = "";
75+
bool _withIngestion = false;
76+
bool _withQueries = true;
7377

7478
public BenchCommand(SeqConnectionFactory connectionFactory)
7579
{
@@ -96,72 +100,48 @@ public BenchCommand(SeqConnectionFactory connectionFactory)
96100
"description=",
97101
"Optional description of the bench test run",
98102
a => _description = a);
103+
Options.Add(
104+
"with-ingestion",
105+
"Should the benchmark include sending events to Seq",
106+
_ => _withIngestion = true);
107+
Options.Add(
108+
"with-queries",
109+
"Should the benchmark include querying Seq",
110+
_ => _withQueries = true);
99111
}
100112

101113
protected override async Task<int> Run()
102114
{
103115
try
104116
{
117+
var (url, apiKey) = _connectionFactory.GetConnectionDetails(_connection);
105118
var connection = _connectionFactory.Connect(_connection);
106119
var seqVersion = (await connection.Client.GetRootAsync()).Version;
107-
108-
var cases = ReadCases(_cases);
109-
var runId = Guid.NewGuid().ToString("N")[..16];
110-
111120
await using var reportingLogger = BuildReportingLogger();
112121

113-
using (!string.IsNullOrWhiteSpace(_description)
114-
? LogContext.PushProperty("Description", _description)
115-
: null)
116-
{
117-
reportingLogger.Information(
118-
"Bench run {RunId} against {ServerUrl} ({SeqVersion}); {CaseCount} cases, {Runs} runs, from {Start} to {End}",
119-
runId, connection.Client.ServerUrl, seqVersion, cases.Cases.Count, _runs, _range.Start, _range.End);
120-
}
122+
var runId = Guid.NewGuid().ToString("N")[..16];
123+
CancellationTokenSource cancellationTokenSource = new ();
124+
var cancellationToken = cancellationTokenSource.Token;
121125

122-
using (LogContext.PushProperty("RunId", runId))
123-
using (LogContext.PushProperty("Start", _range.Start))
124-
using (LogContext.PushProperty("End", _range.End))
126+
if (_withIngestion)
125127
{
126-
foreach (var c in cases.Cases.OrderBy(c => c.Id))
127-
{
128-
var timings = new BenchCaseTimings();
129-
object? lastResult = null;
130-
131-
foreach (var i in Enumerable.Range(1, _runs))
128+
var t = Task.WhenAll(IngestionBenchmark(reportingLogger, runId, connection, apiKey, seqVersion, cancellationToken))
129+
.ContinueWith(async t =>
132130
{
133-
var response = await connection.Data.QueryAsync(
134-
c.Query,
135-
_range.Start,
136-
_range.End,
137-
c.SignalExpression != null ? SignalExpressionPart.Signal(c.SignalExpression) : null
138-
);
139-
140-
timings.PushElapsed(response.Statistics.ElapsedMilliseconds);
141-
142-
if (response.Rows != null)
131+
if (t.Exception is not null)
143132
{
144-
var isScalarResult = response.Rows.Length == 1 && response.Rows[0].Length == 1;
145-
if (isScalarResult && i == _runs)
146-
{
147-
lastResult = response.Rows[0][0];
148-
}
133+
await Console.Error.WriteLineAsync(t.Exception.Message);
149134
}
150-
}
135+
}, TaskContinuationOptions.OnlyOnFaulted);
136+
await Task.Delay(10000); // how long to ingest before beginning queries
137+
}
151138

152-
using (lastResult != null ? LogContext.PushProperty("LastResult", lastResult) : null)
153-
using (!string.IsNullOrWhiteSpace(c.SignalExpression)
154-
? LogContext.PushProperty("SignalExpression", c.SignalExpression)
155-
: null)
156-
using (LogContext.PushProperty("StandardDeviationElapsed", timings.StandardDeviationElapsed))
157-
using (LogContext.PushProperty("Query", c.Query))
158-
{
159-
reportingLogger.Information(
160-
"Case {Id,-40} mean {MeanElapsed,5:N0} ms (first {FirstElapsed,5:N0} ms, min {MinElapsed,5:N0} ms, max {MaxElapsed,5:N0} ms, RSD {RelativeStandardDeviationElapsed,4:N2})",
161-
c.Id, timings.MeanElapsed, timings.FirstElapsed, timings.MinElapsed, timings.MaxElapsed, timings.RelativeStandardDeviationElapsed);
162-
}
163-
}
139+
if (_withQueries)
140+
{
141+
var collectedTimings = await QueryBenchmark(reportingLogger, runId, connection, seqVersion);
142+
collectedTimings.LogSummary();
164143
}
144+
cancellationTokenSource.Cancel();
165145

166146
return 0;
167147
}
@@ -172,6 +152,75 @@ protected override async Task<int> Run()
172152
}
173153
}
174154

155+
Task[] IngestionBenchmark(Logger reportingLogger, string runId, SeqConnection connection, string? apiKey, string seqVersion, CancellationToken cancellationToken = default)
156+
{
157+
return Enumerable.Range(1, 1000)
158+
.Select(i => Simulation.RunAsync(connection, apiKey, 10000, echoToStdout: false, cancellationToken))
159+
.ToArray();
160+
}
161+
162+
async Task<CollectedTimings> QueryBenchmark(Logger reportingLogger, string runId, SeqConnection connection, string seqVersion)
163+
{
164+
var cases = ReadCases(_cases);
165+
CollectedTimings collectedTimings = new();
166+
using (!string.IsNullOrWhiteSpace(_description)
167+
? LogContext.PushProperty("Description", _description)
168+
: null)
169+
{
170+
reportingLogger.Information(
171+
"Bench run {RunId} against {ServerUrl} ({SeqVersion}); {CaseCount} cases, {Runs} runs, from {Start} to {End}",
172+
runId, connection.Client.ServerUrl, seqVersion, cases.Cases.Count, _runs, _range.Start, _range.End);
173+
}
174+
175+
using (LogContext.PushProperty("RunId", runId))
176+
using (LogContext.PushProperty("Start", _range.Start))
177+
using (LogContext.PushProperty("End", _range.End))
178+
{
179+
foreach (var c in cases.Cases.OrderBy(c => c.Id)
180+
.Concat(new [] { CollectedTimings.FINAL_COUNT_CASE }))
181+
{
182+
var timings = new BenchCaseTimings(c);
183+
collectedTimings.Add(timings);
184+
object? lastResult = null;
185+
186+
foreach (var i in Enumerable.Range(1, _runs))
187+
{
188+
var response = await connection.Data.QueryAsync(
189+
c.Query,
190+
_range.Start,
191+
_range.End,
192+
c.SignalExpression != null ? SignalExpressionPart.Signal(c.SignalExpression) : null
193+
);
194+
195+
timings.PushElapsed(response.Statistics.ElapsedMilliseconds);
196+
197+
if (response.Rows != null)
198+
{
199+
var isScalarResult = response.Rows.Length == 1 && response.Rows[0].Length == 1;
200+
if (isScalarResult && i == _runs)
201+
{
202+
lastResult = response.Rows[0][0];
203+
}
204+
}
205+
}
206+
207+
using (lastResult != null ? LogContext.PushProperty("LastResult", lastResult) : null)
208+
using (!string.IsNullOrWhiteSpace(c.SignalExpression)
209+
? LogContext.PushProperty("SignalExpression", c.SignalExpression)
210+
: null)
211+
using (LogContext.PushProperty("StandardDeviationElapsed", timings.StandardDeviationElapsed))
212+
using (LogContext.PushProperty("Query", c.Query))
213+
{
214+
reportingLogger.Information(
215+
"Case {Id,-40} ({LastResult}) mean {MeanElapsed,5:N0} ms (first {FirstElapsed,5:N0} ms, min {MinElapsed,5:N0} ms, max {MaxElapsed,5:N0} ms, RSD {RelativeStandardDeviationElapsed,4:N2})",
216+
c.Id, lastResult, timings.MeanElapsed, timings.FirstElapsed, timings.MinElapsed, timings.MaxElapsed, timings.RelativeStandardDeviationElapsed);
217+
}
218+
}
219+
}
220+
221+
return collectedTimings;
222+
}
223+
175224
/// <summary>
176225
/// Build a second Serilog logger for logging benchmark results.
177226
/// </summary>

src/SeqCli/Sample/Loader/Simulation.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
using System.Threading;
1516
using System.Threading.Tasks;
1617
using Seq.Api;
1718
using SeqCli.Ingestion;
@@ -21,7 +22,7 @@ namespace SeqCli.Sample.Loader;
2122

2223
static class Simulation
2324
{
24-
public static async Task RunAsync(SeqConnection connection, string? apiKey, int batchSize, bool echoToStdout)
25+
public static async Task RunAsync(SeqConnection connection, string? apiKey, int batchSize, bool echoToStdout, CancellationToken cancellationToken = default)
2526
{
2627
var buffer = new BufferingSink();
2728

@@ -36,7 +37,7 @@ public static async Task RunAsync(SeqConnection connection, string? apiKey, int
3637
var ship = Task.Run(() => LogShipper.ShipEvents(connection, apiKey, buffer,
3738
InvalidDataHandling.Fail, SendFailureHandling.Continue, batchSize));
3839

39-
await Roastery.Program.Main(logger);
40+
await Roastery.Program.Main(logger, cancellationToken);
4041
await logger.DisposeAsync();
4142
await ship;
4243
}

0 commit comments

Comments
 (0)