2020using System . Threading . Tasks ;
2121using Newtonsoft . Json ;
2222using Seq . Api ;
23+ using Seq . Api . Model . Data ;
2324using Seq . Api . Model . Signals ;
2425using SeqCli . Cli . Features ;
2526using SeqCli . Connection ;
@@ -73,7 +74,7 @@ class BenchCommand : Command
7374 string _reportingServerApiKey = "" ;
7475 string _description = "" ;
7576 bool _withIngestion = false ;
76- bool _withQueries = true ;
77+ bool _withQueries = false ;
7778
7879 public BenchCommand ( SeqConnectionFactory connectionFactory )
7980 {
@@ -114,34 +115,74 @@ protected override async Task<int> Run()
114115 {
115116 try
116117 {
117- var ( url , apiKey ) = _connectionFactory . GetConnectionDetails ( _connection ) ;
118+ var ( _ , apiKey ) = _connectionFactory . GetConnectionDetails ( _connection ) ;
118119 var connection = _connectionFactory . Connect ( _connection ) ;
119120 var seqVersion = ( await connection . Client . GetRootAsync ( ) ) . Version ;
120121 await using var reportingLogger = BuildReportingLogger ( ) ;
121122
122123 var runId = Guid . NewGuid ( ) . ToString ( "N" ) [ ..16 ] ;
123124 CancellationTokenSource cancellationTokenSource = new ( ) ;
124125 var cancellationToken = cancellationTokenSource . Token ;
125-
126- if ( _withIngestion )
126+
127+ using ( LogContext . PushProperty ( "RunId" , runId ) )
128+ using ( LogContext . PushProperty ( "WithIngestion" , _withIngestion ) )
129+ using ( LogContext . PushProperty ( "WithQueries" , _withQueries ) )
130+ using ( LogContext . PushProperty ( "Start" , _range . Start ) )
131+ using ( LogContext . PushProperty ( "End" , _range . End ) )
127132 {
128- var t = Task . WhenAll ( IngestionBenchmark ( reportingLogger , runId , connection , apiKey , seqVersion , cancellationToken ) )
129- . ContinueWith ( async t =>
133+ if ( _withIngestion )
134+ {
135+ var t = IngestionBenchmark ( reportingLogger , runId , connection , apiKey , seqVersion ,
136+ isQueryBench : _withQueries , cancellationToken )
137+ . ContinueWith ( async t =>
138+ {
139+ if ( t . Exception is not null )
140+ {
141+ await Console . Error . WriteLineAsync ( t . Exception . Message ) ;
142+ }
143+ } ) ;
144+
145+ if ( ! _withQueries )
130146 {
131- if ( t . Exception is not null )
147+ int benchDurationMs = 120_000 ;
148+ await Task . Delay ( benchDurationMs ) ;
149+ cancellationTokenSource . Cancel ( ) ;
150+
151+ var response = await connection . Data . QueryAsync (
152+ "select count(*) from stream group by time(1s)" ,
153+ DateTime . Now . Add ( - 1 * TimeSpan . FromMilliseconds ( benchDurationMs ) )
154+ ) ;
155+
156+ if ( response . Slices == null )
132157 {
133- await Console . Error . WriteLineAsync ( t . Exception . Message ) ;
158+ throw new Exception ( "Failed to query ingestion benchmark results" ) ;
134159 }
135- } , TaskContinuationOptions . OnlyOnFaulted ) ;
136- await Task . Delay ( 10000 ) ; // how long to ingest before beginning queries
137- }
160+
161+ var counts = response . Slices . Select ( s => Convert . ToDouble ( s . Rows [ 0 ] [ 0 ] ) ) . Where ( c => c > 1000 ) . ToArray ( ) ;
162+ var countsMean = counts . Sum ( ) / counts . Length ;
163+ var countsRSD = QueryBenchCaseTimings . StandardDeviation ( counts ) / countsMean ;
164+
165+ using ( LogContext . PushProperty ( "RunId" , runId ) )
166+ using ( LogContext . PushProperty ( "EventsPerSecond" , counts ) )
167+ {
168+ reportingLogger . Information (
169+ "Ingestion benchmark {Description} ran for {RunDuration:N0}ms; ingested {TotalIngested:N0} at {EventsPerMinute:N0}events/min; with RSD {RelativeStandardDeviationPercentage,4:N1}%" ,
170+ _description ,
171+ benchDurationMs ,
172+ counts . Sum ( ) ,
173+ countsMean * 60 ,
174+ countsRSD * 100 ) ;
175+ }
176+ }
177+ }
138178
139- if ( _withQueries )
140- {
141- var collectedTimings = await QueryBenchmark ( reportingLogger , runId , connection , seqVersion ) ;
142- collectedTimings . LogSummary ( ) ;
179+ if ( _withQueries )
180+ {
181+ var collectedTimings = await QueryBenchmark ( reportingLogger , runId , connection , seqVersion ) ;
182+ collectedTimings . LogSummary ( _description ) ;
183+ cancellationTokenSource . Cancel ( ) ;
184+ }
143185 }
144- cancellationTokenSource . Cancel ( ) ;
145186
146187 return 0 ;
147188 }
@@ -152,69 +193,85 @@ protected override async Task<int> Run()
152193 }
153194 }
154195
155- Task [ ] IngestionBenchmark ( Logger reportingLogger , string runId , SeqConnection connection , string ? apiKey , string seqVersion , CancellationToken cancellationToken = default )
196+ async Task IngestionBenchmark ( Logger reportingLogger , string runId , SeqConnection connection , string ? apiKey ,
197+ string seqVersion , bool isQueryBench , CancellationToken cancellationToken = default )
156198 {
157- return Enumerable . Range ( 1 , 1000 )
158- . Select ( i => Simulation . RunAsync ( connection , apiKey , 10000 , echoToStdout : false , cancellationToken ) )
159- . ToArray ( ) ;
199+ using ( ! string . IsNullOrWhiteSpace ( _description )
200+ ? LogContext . PushProperty ( "Description" , _description )
201+ : null )
202+ {
203+ reportingLogger . Information (
204+ "Ingestion bench run {RunId} against {ServerUrl} ({SeqVersion})" ,
205+ runId , connection . Client . ServerUrl , seqVersion ) ;
206+ }
207+
208+ if ( isQueryBench )
209+ {
210+ var simulationTasks = Enumerable . Range ( 1 , 500 )
211+ . Select ( i => Simulation . RunAsync ( connection , apiKey , 10000 , echoToStdout : false , cancellationToken ) )
212+ . ToArray ( ) ;
213+ await Task . Delay ( 20_000 ) ; // how long to ingest before beginning queries
214+ }
215+ else
216+ {
217+ var simulationTasks = Enumerable . Range ( 1 , 2000 )
218+ . Select ( i => Simulation . RunAsync ( connection , apiKey , 10000 , echoToStdout : false , cancellationToken ) )
219+ . ToArray ( ) ;
220+ }
160221 }
161222
162223 async Task < CollectedTimings > QueryBenchmark ( Logger reportingLogger , string runId , SeqConnection connection , string seqVersion )
163224 {
164225 var cases = ReadCases ( _cases ) ;
165- CollectedTimings collectedTimings = new ( ) ;
226+ CollectedTimings collectedTimings = new ( reportingLogger ) ;
166227 using ( ! string . IsNullOrWhiteSpace ( _description )
167228 ? LogContext . PushProperty ( "Description" , _description )
168229 : null )
169230 {
170231 reportingLogger . Information (
171- "Bench run {RunId} against {ServerUrl} ({SeqVersion}); {CaseCount} cases, {Runs} runs, from {Start} to {End}" ,
232+ "Query bench run {RunId} against {ServerUrl} ({SeqVersion}); {CaseCount} cases, {Runs} runs, from {Start} to {End}" ,
172233 runId , connection . Client . ServerUrl , seqVersion , cases . Cases . Count , _runs , _range . Start , _range . End ) ;
173234 }
174235
175- using ( LogContext . PushProperty ( "RunId" , runId ) )
176- using ( LogContext . PushProperty ( "Start" , _range . Start ) )
177- using ( LogContext . PushProperty ( "End" , _range . End ) )
236+ foreach ( var c in cases . Cases . OrderBy ( c => c . Id )
237+ . Concat ( new [ ] { CollectedTimings . FINAL_COUNT_CASE } ) )
178238 {
179- foreach ( var c in cases . Cases . OrderBy ( c => c . Id )
180- . Concat ( new [ ] { CollectedTimings . FINAL_COUNT_CASE } ) )
239+ var timings = new QueryBenchCaseTimings ( c ) ;
240+ collectedTimings . Add ( timings ) ;
241+
242+ foreach ( var i in Enumerable . Range ( 1 , _runs ) )
181243 {
182- var timings = new BenchCaseTimings ( c ) ;
183- collectedTimings . Add ( timings ) ;
184- object ? lastResult = null ;
244+ var response = await connection . Data . QueryAsync (
245+ c . Query ,
246+ _range . Start ,
247+ _range . End ,
248+ c . SignalExpression != null ? SignalExpressionPart . Signal ( c . SignalExpression ) : null ,
249+ null ,
250+ TimeSpan . FromMinutes ( 4 )
251+ ) ;
252+
253+ timings . PushElapsed ( response . Statistics . ElapsedMilliseconds ) ;
185254
186- foreach ( var i in Enumerable . Range ( 1 , _runs ) )
255+ if ( response . Rows != null )
187256 {
188- var response = await connection . Data . QueryAsync (
189- c . Query ,
190- _range . Start ,
191- _range . End ,
192- c . SignalExpression != null ? SignalExpressionPart . Signal ( c . SignalExpression ) : null
193- ) ;
194-
195- timings . PushElapsed ( response . Statistics . ElapsedMilliseconds ) ;
196-
197- if ( response . Rows != null )
257+ var isScalarResult = response . Rows . Length == 1 && response . Rows [ 0 ] . Length == 1 ;
258+ if ( isScalarResult && i == _runs )
198259 {
199- var isScalarResult = response . Rows . Length == 1 && response . Rows [ 0 ] . Length == 1 ;
200- if ( isScalarResult && i == _runs )
201- {
202- lastResult = response . Rows [ 0 ] [ 0 ] ;
203- }
260+ timings . LastResult = response . Rows [ 0 ] [ 0 ] ;
204261 }
205262 }
263+ }
206264
207- using ( lastResult != null ? LogContext . PushProperty ( "LastResult" , lastResult ) : null )
208- using ( ! string . IsNullOrWhiteSpace ( c . SignalExpression )
209- ? LogContext . PushProperty ( "SignalExpression" , c . SignalExpression )
210- : null )
211- using ( LogContext . PushProperty ( "StandardDeviationElapsed" , timings . StandardDeviationElapsed ) )
212- using ( LogContext . PushProperty ( "Query" , c . Query ) )
213- {
214- reportingLogger . Information (
215- "Case {Id,-40} ({LastResult}) mean {MeanElapsed,5:N0} ms (first {FirstElapsed,5:N0} ms, min {MinElapsed,5:N0} ms, max {MaxElapsed,5:N0} ms, RSD {RelativeStandardDeviationElapsed,4:N2})" ,
216- c . Id , lastResult , timings . MeanElapsed , timings . FirstElapsed , timings . MinElapsed , timings . MaxElapsed , timings . RelativeStandardDeviationElapsed ) ;
217- }
265+ using ( timings . LastResult != null ? LogContext . PushProperty ( "LastResult" , timings . LastResult ) : null )
266+ using ( ! string . IsNullOrWhiteSpace ( c . SignalExpression )
267+ ? LogContext . PushProperty ( "SignalExpression" , c . SignalExpression )
268+ : null )
269+ using ( LogContext . PushProperty ( "StandardDeviationElapsed" , timings . StandardDeviationElapsed ) )
270+ using ( LogContext . PushProperty ( "Query" , c . Query ) )
271+ {
272+ reportingLogger . Information (
273+ "Case {Id,-40} ({LastResult}) mean {MeanElapsed,5:N0} ms (first {FirstElapsed,5:N0} ms, min {MinElapsed,5:N0} ms, max {MaxElapsed,5:N0} ms, RSD {RelativeStandardDeviationElapsed,4:N2})" ,
274+ c . Id , timings . LastResult , timings . MeanElapsed , timings . FirstElapsed , timings . MinElapsed , timings . MaxElapsed , timings . RelativeStandardDeviationElapsed ) ;
218275 }
219276 }
220277
0 commit comments