3
3
4
4
#![ doc = include_str ! ( "README.md" ) ]
5
5
6
+ use crate :: TestContext ;
6
7
use azure_core:: { time:: Duration , Error , Result } ;
7
8
use clap:: ArgMatches ;
8
9
use std:: {
@@ -14,15 +15,13 @@ use std::{
14
15
Arc ,
15
16
} ,
16
17
} ;
17
- use tokio:: select;
18
-
19
- use crate :: TestContext ;
18
+ use tokio:: { select, task:: JoinSet } ;
20
19
21
20
#[ cfg_attr( target_arch = "wasm32" , async_trait:: async_trait( ?Send ) ) ]
22
21
#[ cfg_attr( not( target_arch = "wasm32" ) , async_trait:: async_trait) ]
23
22
pub trait PerfTest : Send + Sync {
24
23
async fn setup ( & self , context : & TestContext ) -> azure_core:: Result < ( ) > ;
25
- async fn run ( & self /*context: &TestContext*/ ) -> azure_core:: Result < ( ) > ;
24
+ async fn run ( & self /*, context: &TestContext*/ ) -> azure_core:: Result < ( ) > ;
26
25
async fn cleanup ( & self , context : & TestContext ) -> azure_core:: Result < ( ) > ;
27
26
}
28
27
@@ -76,6 +75,7 @@ struct PerfRunnerOptions {
76
75
parallel : usize ,
77
76
duration : Duration ,
78
77
warmup : Duration ,
78
+ disable_progress : bool ,
79
79
test_results_filename : String ,
80
80
}
81
81
@@ -91,6 +91,7 @@ impl From<&ArgMatches> for PerfRunnerOptions {
91
91
parallel : * matches
92
92
. get_one :: < usize > ( "parallel" )
93
93
. expect ( "defaulted by clap" ) ,
94
+ disable_progress : matches. get_flag ( "no-progress" ) ,
94
95
duration : Duration :: seconds (
95
96
* matches
96
97
. get_one :: < i64 > ( "duration" )
@@ -116,6 +117,7 @@ pub struct PerfRunner {
116
117
arguments : ArgMatches ,
117
118
package_dir : & ' static str ,
118
119
module_name : & ' static str ,
120
+ progress : Arc < AtomicU64 > ,
119
121
}
120
122
121
123
impl PerfRunner {
@@ -132,6 +134,7 @@ impl PerfRunner {
132
134
arguments,
133
135
package_dir,
134
136
module_name,
137
+ progress : Arc :: new ( AtomicU64 :: new ( 0 ) ) ,
135
138
} )
136
139
}
137
140
@@ -156,6 +159,7 @@ impl PerfRunner {
156
159
arguments,
157
160
package_dir,
158
161
module_name,
162
+ progress : Arc :: new ( AtomicU64 :: new ( 0 ) ) ,
159
163
} )
160
164
}
161
165
@@ -229,32 +233,26 @@ impl PerfRunner {
229
233
println ! ( "========== Starting test setup ==========" ) ;
230
234
test_instance. setup ( & context) . await ?;
231
235
232
- println ! ( "========== Starting test warmup ==========" ) ;
236
+ println ! (
237
+ "========== Starting test warmup for {} ==========" ,
238
+ self . options. warmup
239
+ ) ;
233
240
234
- self . run_test_for ( Arc :: clone ( & test_instance) , & context , self . options . warmup )
241
+ self . run_test_for ( Arc :: clone ( & test_instance) , test . name , self . options . warmup )
235
242
. await ?;
236
243
237
- println ! ( "========== Starting test run ==========" ) ;
238
- println ! ( "Running test for {} seconds" , self . options . duration ) ;
239
- println ! ( "Parallelism: {}" , self . options. parallel ) ;
240
- let iteration_count = self
241
- . run_test_for ( Arc :: clone ( & test_instance) , & context , self . options . duration )
244
+ println ! (
245
+ "========== Starting test run for {} ==========" ,
246
+ self . options. duration
247
+ ) ;
248
+ self . run_test_for ( Arc :: clone ( & test_instance) , test . name , self . options . duration )
242
249
. await ?;
243
250
if !self . options . no_cleanup {
244
251
println ! ( "========== Starting test cleanup ==========" ) ;
245
252
test_instance. cleanup ( & context) . await ?;
246
253
}
247
- println ! ( "========== Starting test cleanup ==========" ) ;
248
- test_instance. cleanup ( & context) . await ?;
249
254
250
- println ! (
251
- "Completed test iteration {}/{} - {} iterations run in {} seconds - {} iterations/second" ,
252
- iteration + 1 ,
253
- self . options. iterations,
254
- iteration_count,
255
- self . options. duration. as_seconds_f64( ) ,
256
- iteration_count as f64 / self . options. duration. as_seconds_f64( )
257
- ) ;
255
+ let iteration_count = self . progress . load ( Ordering :: SeqCst ) ;
258
256
println ! (
259
257
"Completed test iteration {}/{} - {} iterations run in {} seconds - {} seconds/iteration" ,
260
258
iteration + 1 ,
@@ -263,44 +261,49 @@ impl PerfRunner {
263
261
self . options. duration. as_seconds_f64( ) ,
264
262
self . options. duration. as_seconds_f64( ) / iteration_count as f64
265
263
) ;
264
+ let operations_per_second =
265
+ self . options . duration . as_seconds_f64 ( ) / iteration_count as f64 ;
266
+ let duration_per_operation = Duration :: seconds_f64 ( operations_per_second) ;
267
+ println ! ( "{} seconds/operation" , duration_per_operation) ;
266
268
}
267
269
Ok ( ( ) )
268
270
}
269
271
pub async fn run_test_for (
270
272
& self ,
271
273
test_instance : Arc < dyn PerfTest > ,
272
- _context : & TestContext ,
274
+ _test_name : & str ,
273
275
duration : Duration ,
274
- ) -> azure_core:: Result < u64 > {
275
- let iteration_count = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
276
- let mut tasks = Vec :: with_capacity ( self . options . parallel ) ;
276
+ ) -> azure_core:: Result < ( ) > {
277
+ let mut tasks: JoinSet < Result < ( ) > > = JoinSet :: new ( ) ;
277
278
for _ in 0 ..self . options . parallel {
278
279
let test_instance_clone = Arc :: clone ( & test_instance) ;
279
- let ic = Arc :: clone ( & iteration_count) ;
280
- let task: tokio:: task:: JoinHandle < Result < ( ) > > = tokio:: spawn ( async move {
280
+ let progress = self . progress . clone ( ) ;
281
+ // let package_dir = self.package_dir;
282
+ // let module_name = self.module_name;
283
+ tasks. spawn ( async move {
284
+ // let context =
285
+ // TestContext::new(package_dir, module_name, " test_name_copy.as_str()")?;
286
+
281
287
loop {
282
- if ic. load ( Ordering :: SeqCst ) % 1000 == 0 {
283
- println ! ( "Iteration {}" , ic. load( Ordering :: SeqCst ) ) ;
284
- }
285
- test_instance_clone. run ( ) . await ?;
286
- ic. fetch_add ( 1 , Ordering :: SeqCst ) ;
288
+ test_instance_clone. run ( /*&context*/ ) . await ?;
289
+ progress. fetch_add ( 1 , Ordering :: SeqCst ) ;
287
290
}
288
- #[ allow( unreachable_code) ]
289
- Ok ( ( ) )
290
291
} ) ;
291
- tasks. push ( task) ;
292
292
}
293
- let timeout = std:: time:: Duration :: from_secs_f64 ( duration. as_seconds_f64 ( ) ) ;
293
+ let start = tokio:: time:: Instant :: now ( ) ;
294
+ let timeout = tokio:: time:: Duration :: from_secs_f64 ( duration. as_seconds_f64 ( ) ) ;
294
295
select ! (
295
- _ = futures:: future:: join_all( tasks) => {
296
- println!( "All tasks completed unexpectedly." ) ;
297
- // All tasks completed (should not happen in normal operation).
298
- }
299
- _ = tokio:: time:: sleep( timeout) => {
300
- println!( "Duration elapsed, stopping tasks." ) ;
301
- }
296
+ _ = tokio:: time:: sleep( timeout) => { println!( "Timeout reached, stopping test tasks: {:?}" , start. elapsed( ) ) ; } ,
297
+ _ = tasks. join_all( ) => { println!( "All test tasks completed: {:?}" , start. elapsed( ) ) ; } ,
298
+ _ = async {
299
+ loop {
300
+ tokio:: time:: sleep( tokio:: time:: Duration :: from_secs( 1 ) ) . await ;
301
+ println!( "{:?} elapsed: {} per operation." , start. elapsed( ) , Duration :: seconds_f64( start. elapsed( ) . as_secs_f64( ) / self . progress. load( Ordering :: SeqCst ) as f64 ) ) ;
302
+ }
303
+ } , if !self . options. disable_progress => { } ,
302
304
) ;
303
- Ok ( iteration_count. load ( Ordering :: SeqCst ) )
305
+ println ! ( "Task time elapsed: {:?}" , start. elapsed( ) ) ;
306
+ Ok ( ( ) )
304
307
}
305
308
306
309
// * Disable test cleanup
@@ -335,6 +338,7 @@ impl PerfRunner {
335
338
. value_parser ( clap:: value_parser!( usize ) )
336
339
. global ( false ) ,
337
340
)
341
+ . arg ( clap:: arg!( --"no-progress" "Disable progress reporting" ) . required ( false ) . global ( false ) )
338
342
. arg (
339
343
clap:: arg!( --duration <SECONDS > "The duration of each test in seconds" )
340
344
. required ( false )
0 commit comments