Skip to content

Commit 8d58f99

Browse files
committed
Added progress tracker
1 parent fde683f commit 8d58f99

File tree

3 files changed

+61
-49
lines changed

3 files changed

+61
-49
lines changed

sdk/core/azure_core_test/src/perf/config_tests.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -674,7 +674,10 @@ async fn test_perf_runner_with_test_functions() {
674674
.setup(&test_context)
675675
.await
676676
.expect("Setup failed");
677-
perf_tests_impl.run(/*&context */).await.expect("Run failed");
677+
perf_tests_impl
678+
.run(/*&test_context*/)
679+
.await
680+
.expect("Run failed");
678681
perf_tests_impl
679682
.cleanup(&test_context)
680683
.await

sdk/core/azure_core_test/src/perf/framework_tests.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ fn create_fibonacci1_test(runner: &PerfRunner) -> CreatePerfTestReturn {
4040
}
4141
async fn run(&self /*, _context: &TestContext*/) -> azure_core::Result<()> {
4242
let _result = Self::fibonacci(self.count);
43+
// This is a CPU bound test, so yield to allow other tasks to run. Otherwise we jam the tokio scheduler.
44+
// Note that this significantly reduces the performance of the test, but it is necessary to allow parallelism.
45+
//
46+
// In a real-world scenario, the test would be doing async work (e.g. network I/O) which would yield naturally.
47+
tokio::task::yield_now().await;
4348
Ok(())
4449
}
4550
async fn cleanup(&self, _context: &TestContext) -> azure_core::Result<()> {
@@ -76,20 +81,20 @@ async fn test_perf_runner_with_single_test() {
7681
"--iterations",
7782
"1",
7883
"--parallel",
79-
"10",
84+
"30",
8085
"--duration",
81-
"1",
86+
"10",
8287
"--warmup",
8388
"1",
84-
"basic_test",
89+
"fibonacci1",
8590
"-c",
8691
"10",
8792
];
8893
let runner = PerfRunner::with_command_line(
8994
env!("CARGO_MANIFEST_DIR"),
9095
file!(),
9196
vec![TestMetadata {
92-
name: "basic_test",
97+
name: "fibonacci1",
9398
description: "A basic test for testing purposes",
9499
options: vec![TestOption {
95100
name: "count",

sdk/core/azure_core_test/src/perf/mod.rs

Lines changed: 48 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#![doc = include_str!("README.md")]
55

6+
use crate::TestContext;
67
use azure_core::{time::Duration, Error, Result};
78
use clap::ArgMatches;
89
use std::{
@@ -14,15 +15,13 @@ use std::{
1415
Arc,
1516
},
1617
};
17-
use tokio::select;
18-
19-
use crate::TestContext;
18+
use tokio::{select, task::JoinSet};
2019

2120
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
2221
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
2322
pub trait PerfTest: Send + Sync {
2423
async fn setup(&self, context: &TestContext) -> azure_core::Result<()>;
25-
async fn run(&self /*context: &TestContext*/) -> azure_core::Result<()>;
24+
async fn run(&self /*, context: &TestContext*/) -> azure_core::Result<()>;
2625
async fn cleanup(&self, context: &TestContext) -> azure_core::Result<()>;
2726
}
2827

@@ -76,6 +75,7 @@ struct PerfRunnerOptions {
7675
parallel: usize,
7776
duration: Duration,
7877
warmup: Duration,
78+
disable_progress: bool,
7979
test_results_filename: String,
8080
}
8181

@@ -91,6 +91,7 @@ impl From<&ArgMatches> for PerfRunnerOptions {
9191
parallel: *matches
9292
.get_one::<usize>("parallel")
9393
.expect("defaulted by clap"),
94+
disable_progress: matches.get_flag("no-progress"),
9495
duration: Duration::seconds(
9596
*matches
9697
.get_one::<i64>("duration")
@@ -116,6 +117,7 @@ pub struct PerfRunner {
116117
arguments: ArgMatches,
117118
package_dir: &'static str,
118119
module_name: &'static str,
120+
progress: Arc<AtomicU64>,
119121
}
120122

121123
impl PerfRunner {
@@ -132,6 +134,7 @@ impl PerfRunner {
132134
arguments,
133135
package_dir,
134136
module_name,
137+
progress: Arc::new(AtomicU64::new(0)),
135138
})
136139
}
137140

@@ -156,6 +159,7 @@ impl PerfRunner {
156159
arguments,
157160
package_dir,
158161
module_name,
162+
progress: Arc::new(AtomicU64::new(0)),
159163
})
160164
}
161165

@@ -229,32 +233,26 @@ impl PerfRunner {
229233
println!("========== Starting test setup ==========");
230234
test_instance.setup(&context).await?;
231235

232-
println!("========== Starting test warmup ==========");
236+
println!(
237+
"========== Starting test warmup for {} ==========",
238+
self.options.warmup
239+
);
233240

234-
self.run_test_for(Arc::clone(&test_instance), &context, self.options.warmup)
241+
self.run_test_for(Arc::clone(&test_instance), test.name, self.options.warmup)
235242
.await?;
236243

237-
println!("========== Starting test run ==========");
238-
println!("Running test for {} seconds", self.options.duration);
239-
println!("Parallelism: {}", self.options.parallel);
240-
let iteration_count = self
241-
.run_test_for(Arc::clone(&test_instance), &context, self.options.duration)
244+
println!(
245+
"========== Starting test run for {} ==========",
246+
self.options.duration
247+
);
248+
self.run_test_for(Arc::clone(&test_instance), test.name, self.options.duration)
242249
.await?;
243250
if !self.options.no_cleanup {
244251
println!("========== Starting test cleanup ==========");
245252
test_instance.cleanup(&context).await?;
246253
}
247-
println!("========== Starting test cleanup ==========");
248-
test_instance.cleanup(&context).await?;
249254

250-
println!(
251-
"Completed test iteration {}/{} - {} iterations run in {} seconds - {} iterations/second",
252-
iteration + 1,
253-
self.options.iterations,
254-
iteration_count,
255-
self.options.duration.as_seconds_f64(),
256-
iteration_count as f64 / self.options.duration.as_seconds_f64()
257-
);
255+
let iteration_count = self.progress.load(Ordering::SeqCst);
258256
println!(
259257
"Completed test iteration {}/{} - {} iterations run in {} seconds - {} seconds/iteration",
260258
iteration + 1,
@@ -263,44 +261,49 @@ impl PerfRunner {
263261
self.options.duration.as_seconds_f64(),
264262
self.options.duration.as_seconds_f64() / iteration_count as f64
265263
);
264+
let operations_per_second =
265+
self.options.duration.as_seconds_f64() / iteration_count as f64;
266+
let duration_per_operation = Duration::seconds_f64(operations_per_second);
267+
println!("{} seconds/operation", duration_per_operation);
266268
}
267269
Ok(())
268270
}
269271
pub async fn run_test_for(
270272
&self,
271273
test_instance: Arc<dyn PerfTest>,
272-
_context: &TestContext,
274+
_test_name: &str,
273275
duration: Duration,
274-
) -> azure_core::Result<u64> {
275-
let iteration_count = Arc::new(AtomicU64::new(0));
276-
let mut tasks = Vec::with_capacity(self.options.parallel);
276+
) -> azure_core::Result<()> {
277+
let mut tasks: JoinSet<Result<()>> = JoinSet::new();
277278
for _ in 0..self.options.parallel {
278279
let test_instance_clone = Arc::clone(&test_instance);
279-
let ic = Arc::clone(&iteration_count);
280-
let task: tokio::task::JoinHandle<Result<()>> = tokio::spawn(async move {
280+
let progress = self.progress.clone();
281+
// let package_dir = self.package_dir;
282+
// let module_name = self.module_name;
283+
tasks.spawn(async move {
284+
// let context =
285+
// TestContext::new(package_dir, module_name, " test_name_copy.as_str()")?;
286+
281287
loop {
282-
if ic.load(Ordering::SeqCst) % 1000 == 0 {
283-
println!("Iteration {}", ic.load(Ordering::SeqCst));
284-
}
285-
test_instance_clone.run().await?;
286-
ic.fetch_add(1, Ordering::SeqCst);
288+
test_instance_clone.run(/*&context*/).await?;
289+
progress.fetch_add(1, Ordering::SeqCst);
287290
}
288-
#[allow(unreachable_code)]
289-
Ok(())
290291
});
291-
tasks.push(task);
292292
}
293-
let timeout = std::time::Duration::from_secs_f64(duration.as_seconds_f64());
293+
let start = tokio::time::Instant::now();
294+
let timeout = tokio::time::Duration::from_secs_f64(duration.as_seconds_f64());
294295
select!(
295-
_ = futures::future::join_all(tasks) => {
296-
println!("All tasks completed unexpectedly.");
297-
// All tasks completed (should not happen in normal operation).
298-
}
299-
_ = tokio::time::sleep(timeout) => {
300-
println!("Duration elapsed, stopping tasks.");
301-
}
296+
_ = tokio::time::sleep(timeout) => {println!("Timeout reached, stopping test tasks: {:?}", start.elapsed());},
297+
_ = tasks.join_all() => {println!("All test tasks completed: {:?}", start.elapsed());},
298+
_ = async {
299+
loop {
300+
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
301+
println!("{:?} elapsed: {} per operation.", start.elapsed(), Duration::seconds_f64( start.elapsed().as_secs_f64() / self.progress.load(Ordering::SeqCst) as f64 ));
302+
}
303+
}, if !self.options.disable_progress => {},
302304
);
303-
Ok(iteration_count.load(Ordering::SeqCst))
305+
println!("Task time elapsed: {:?}", start.elapsed());
306+
Ok(())
304307
}
305308

306309
// * Disable test cleanup
@@ -335,6 +338,7 @@ impl PerfRunner {
335338
.value_parser(clap::value_parser!(usize))
336339
.global(false),
337340
)
341+
.arg(clap::arg!(--"no-progress" "Disable progress reporting").required(false).global(false))
338342
.arg(
339343
clap::arg!(--duration <SECONDS> "The duration of each test in seconds")
340344
.required(false)

0 commit comments

Comments
 (0)