Skip to content

Commit f4858c0

Browse files
authored
ref(stresstest): Add progress bars (#125)
Stresstests can run for a long time, and especially with BigTable the cleanup section has been intransparent. This adds an interactive progress bar which we can use to observe progress. We can improve this progress bar at a later time to render live stats during the test.
1 parent 23549ee commit f4858c0

File tree

4 files changed

+64
-6
lines changed

4 files changed

+64
-6
lines changed

Cargo.lock

Lines changed: 36 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

stresstest/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ bytesize = { version = "2.0.1", features = ["serde"] }
1111
futures = { workspace = true }
1212
futures-util = { workspace = true }
1313
humantime-serde = { workspace = true }
14+
indicatif = "0.18.0"
1415
objectstore-client = { workspace = true }
1516
rand = { workspace = true, features = ["small_rng"] }
1617
rand_distr = "0.5.1"

stresstest/src/stresstest.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use anyhow::Result;
88

99
use bytesize::ByteSize;
1010
use futures::StreamExt;
11+
use indicatif::{ProgressBar, ProgressStyle};
1112
use sketches_ddsketch::DDSketch;
1213
use tokio::sync::Semaphore;
1314
use yansi::Paint;
@@ -30,6 +31,11 @@ pub async fn run(
3031

3132
let remote = Arc::new(remote);
3233

34+
let bar = ProgressBar::new_spinner()
35+
.with_style(ProgressStyle::with_template("{spinner} {msg} {elapsed}")?)
36+
.with_message("Running stresstest:");
37+
bar.enable_steady_tick(Duration::from_millis(100));
38+
3339
// run the workloads concurrently
3440
let tasks: Vec<_> = workloads
3541
.into_iter()
@@ -40,6 +46,7 @@ pub async fn run(
4046
.collect();
4147

4248
let finished_tasks = futures::future::join_all(tasks).await;
49+
bar.finish_and_clear();
4350

4451
let mut total_metrics = WorkloadMetrics::default();
4552
let workloads = finished_tasks.into_iter().map(|task| {
@@ -78,32 +85,46 @@ pub async fn run(
7885

7986
let workloads: Vec<_> = workloads.collect();
8087
let max_concurrency = workloads.iter().map(|w| w.concurrency).max().unwrap();
81-
let files_to_cleanup = workloads.into_iter().flat_map(|mut w| w.external_files());
88+
let files_to_cleanup = workloads.iter().flat_map(|w| w.external_files());
89+
let cleanup_count = workloads.iter().flat_map(|w| w.external_files()).count();
8290

8391
println!();
8492
println!("{}", "## TOTALS".bold());
8593
print_metrics(&total_metrics, duration);
94+
println!();
95+
96+
let bar = ProgressBar::new(cleanup_count as u64)
97+
.with_message("Deleting remaining files...")
98+
.with_style(ProgressStyle::with_template(
99+
"{msg}\n{wide_bar} {pos}/{len}",
100+
)?);
101+
bar.enable_steady_tick(Duration::from_millis(100));
86102

87103
let start = Instant::now();
88104
let cleanup_timing = Arc::new(Mutex::new(DDSketch::default()));
89105
futures::stream::iter(files_to_cleanup)
90106
.for_each_concurrent(max_concurrency, |(usecase, organization_id, object_key)| {
91107
let remote = remote.clone();
92108
let cleanup_timing = cleanup_timing.clone();
109+
let bar = &bar;
93110
async move {
94111
let start = Instant::now();
95-
remote.delete(&usecase, organization_id, &object_key).await;
112+
remote.delete(usecase, *organization_id, object_key).await;
96113
cleanup_timing
97114
.lock()
98115
.unwrap()
99116
.add(start.elapsed().as_secs_f64());
117+
118+
bar.inc(1);
100119
}
101120
})
102121
.await;
122+
123+
bar.finish_and_clear();
124+
103125
let cleanup_duration = start.elapsed();
104126
let cleanup_timing = cleanup_timing.lock().unwrap();
105127

106-
println!();
107128
println!(
108129
"{} ({} files, concurrency: {})",
109130
"## CLEANUP".bold(),

stresstest/src/workload.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,9 +256,9 @@ impl Workload {
256256
self.existing_files.push((internal, external))
257257
}
258258

259-
pub(crate) fn external_files(&mut self) -> impl Iterator<Item = ExternalId> + use<> {
260-
std::mem::take(&mut self.existing_files)
261-
.into_iter()
259+
pub(crate) fn external_files(&self) -> impl Iterator<Item = &ExternalId> {
260+
self.existing_files
261+
.iter()
262262
.map(|(_internal, external)| external)
263263
}
264264
}

0 commit comments

Comments
 (0)