Skip to content
This repository was archived by the owner on Jan 30, 2026. It is now read-only.

Commit 2b2a17f

Browse files
authored
refactor: --catchup-delay arg for bench (#203)
1 parent 10295a8 commit 2b2a17f

File tree

6 files changed

+94
-98
lines changed

6 files changed

+94
-98
lines changed

Cargo.lock

Lines changed: 18 additions & 18 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,40 +14,40 @@ path = "src/main.rs"
1414

1515
[dependencies]
1616
async-stream = "0.3.6"
17-
base64ct = { version = "1.7.3", features = ["alloc"] }
18-
bytes = "1.10.1"
19-
clap = { version = "4.5.38", features = ["derive"] }
17+
base64ct = { version = "1.8.3", features = ["alloc"] }
18+
bytes = "1.11.0"
19+
clap = { version = "4.5.54", features = ["derive"] }
2020
color-print = "0.3.7"
21-
colored = "3.0.0"
22-
config = "0.15.11"
21+
colored = "3.1.1"
22+
config = "0.15.19"
2323
dirs = "6.0.0"
2424
futures = "0.3.31"
25-
http = "1.3.1"
26-
humantime = "2.2.0"
25+
http = "1.4.0"
26+
humantime = "2.3.0"
2727
indicatif = "0.18.3"
2828
json_to_table = "0.12.0"
2929
miette = { version = "7.6.0", features = ["fancy"] }
30-
rand = "0.9.1"
30+
rand = "0.9.2"
3131
s2-sdk = { version = "0.22.4", features = ["_hidden"] }
32-
serde = { version = "1.0.219", features = ["derive"] }
33-
serde_json = { version = "1.0.140", features = ["preserve_order"] }
32+
serde = { version = "1.0.228", features = ["derive"] }
33+
serde_json = { version = "1.0.149", features = ["preserve_order"] }
3434
strum = { version = "0.27", features = ["derive"] }
3535
tabled = "0.20.0"
36-
thiserror = "2.0.12"
37-
tokio = { version = "1.45.1", features = ["full"] }
38-
tokio-stream = { version = "0.1.17", features = ["io-util"] }
36+
thiserror = "2.0.18"
37+
tokio = { version = "1.49.0", features = ["full"] }
38+
tokio-stream = { version = "0.1.18", features = ["io-util"] }
3939
toml = "0.9.11"
40-
tracing = "0.1.41"
41-
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
40+
tracing = "0.1.44"
41+
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }
4242
uuid = { version = "1.19.0", features = ["v4"] }
4343
xxhash-rust = { version = "0.8.15", features = ["xxh3"] }
4444

4545
[dev-dependencies]
46-
assert_cmd = "2.0"
46+
assert_cmd = "2.1"
4747
predicates = "3.1"
48-
rstest = "0.26.0"
49-
serial_test = "3.2"
50-
tempfile = "3.15"
48+
rstest = "0.26.1"
49+
serial_test = "3.3"
50+
tempfile = "3.24"
5151

5252
[profile.release]
5353
lto = true

src/bench.rs

Lines changed: 51 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ pub async fn run(
424424
record_size: usize,
425425
target_mibps: NonZeroU64,
426426
duration: Duration,
427-
catchup: bool,
427+
catchup_delay: Duration,
428428
) -> Result<(), CliError> {
429429
assert!(record_size <= RECORD_BATCH_MAX.bytes);
430430

@@ -634,66 +634,61 @@ pub async fn run(
634634
)));
635635
}
636636

637-
if catchup {
638-
eprintln!();
639-
eprintln!("Waiting 20s before catchup read...");
640-
tokio::time::sleep(Duration::from_secs(20)).await;
641-
642-
let catchup_bar = ProgressBar::no_length()
643-
.with_prefix(format!("{:width$}", "catchup", width = prefix_width))
644-
.with_style(
645-
ProgressStyle::default_bar()
646-
.template("{prefix:.bold.cyan} {msg}")
647-
.expect("valid template"),
648-
);
649-
let mut catchup_sample: Option<BenchReadSample> = None;
650-
let mut catchup_run_hash: Option<u64> = None;
651-
let catchup_stream = bench_read_catchup(stream.clone(), record_size, bench_start);
652-
let mut catchup_stream = std::pin::pin!(catchup_stream);
653-
while let Some(result) = catchup_stream.next().await {
654-
match result {
655-
Ok(sample) => {
656-
update_bench_bar(&catchup_bar, &sample);
657-
if let Some(hash) = sample.run_hash {
658-
catchup_run_hash = Some(hash);
659-
}
660-
catchup_sample = Some(sample);
661-
}
662-
Err(e) => {
663-
catchup_bar.finish_and_clear();
664-
return Err(e);
637+
eprintln!();
638+
eprintln!("Waiting {:?} before catchup read...", catchup_delay);
639+
tokio::time::sleep(catchup_delay).await;
640+
641+
let catchup_bar = ProgressBar::no_length()
642+
.with_prefix(format!("{:width$}", "catchup", width = prefix_width))
643+
.with_style(
644+
ProgressStyle::default_bar()
645+
.template("{prefix:.bold.cyan} {msg}")
646+
.expect("valid template"),
647+
);
648+
let mut catchup_sample: Option<BenchReadSample> = None;
649+
let mut catchup_run_hash: Option<u64> = None;
650+
let catchup_stream = bench_read_catchup(stream.clone(), record_size, bench_start);
651+
let mut catchup_stream = std::pin::pin!(catchup_stream);
652+
while let Some(result) = catchup_stream.next().await {
653+
match result {
654+
Ok(sample) => {
655+
update_bench_bar(&catchup_bar, &sample);
656+
if let Some(hash) = sample.run_hash {
657+
catchup_run_hash = Some(hash);
665658
}
659+
catchup_sample = Some(sample);
660+
}
661+
Err(e) => {
662+
catchup_bar.finish_and_clear();
663+
return Err(e);
666664
}
667665
}
666+
}
668667

669-
catchup_bar.finish_and_clear();
670-
if let Some(sample) = catchup_sample {
671-
eprintln!(
672-
"{}: {:.2} MiB/s, {:.0} records/s ({} bytes, {} records in {:.2}s)",
673-
"Catchup".bold().cyan(),
674-
sample.mib_per_sec(),
675-
sample.records_per_sec(),
676-
sample.bytes,
677-
sample.records,
678-
sample.elapsed.as_secs_f64()
679-
);
680-
} else {
681-
eprintln!(
682-
"{}: no records available for catchup read",
683-
"Catchup".bold().cyan()
684-
);
685-
}
686-
687-
if let (Some(expected), Some(actual)) = (write_run_hash, catchup_run_hash)
688-
&& expected != actual
689-
{
690-
return Err(CliError::BenchVerification(format!(
691-
"catchup read hash mismatch: expected {expected}, got {actual}"
692-
)));
693-
}
668+
catchup_bar.finish_and_clear();
669+
if let Some(sample) = catchup_sample {
670+
eprintln!(
671+
"{}: {:.2} MiB/s, {:.0} records/s ({} bytes, {} records in {:.2}s)",
672+
"Catchup".bold().cyan(),
673+
sample.mib_per_sec(),
674+
sample.records_per_sec(),
675+
sample.bytes,
676+
sample.records,
677+
sample.elapsed.as_secs_f64()
678+
);
694679
} else {
695-
eprintln!();
696-
eprintln!("Skipping catchup read.");
680+
eprintln!(
681+
"{}: no records available for catchup read",
682+
"Catchup".bold().cyan()
683+
);
684+
}
685+
686+
if let (Some(expected), Some(actual)) = (write_run_hash, catchup_run_hash)
687+
&& expected != actual
688+
{
689+
return Err(CliError::BenchVerification(format!(
690+
"catchup read hash mismatch: expected {expected}, got {actual}"
691+
)));
697692
}
698693

699694
Ok(())

src/cli.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -531,9 +531,9 @@ pub struct BenchArgs {
531531
#[arg(short = 'd', long, default_value = "60s")]
532532
pub duration: humantime::Duration,
533533

534-
/// Skip the catchup read after the live run.
535-
#[arg(long, default_value_t = false)]
536-
pub no_catchup: bool,
534+
/// Delay before starting the catchup read.
535+
#[arg(short = 'w', long, default_value = "20s")]
536+
pub catchup_delay: humantime::Duration,
537537
}
538538

539539
/// Time range args for gauge metrics (no interval).

src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ async fn run() -> Result<(), CliError> {
541541
args.record_size as usize,
542542
args.target_mibps,
543543
*args.duration,
544-
!args.no_catchup,
544+
*args.catchup_delay,
545545
)
546546
.await;
547547
let delete_res = basin

tests/integration.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1044,7 +1044,8 @@ fn bench_stream() {
10441044
"1s",
10451045
"--target-mibps",
10461046
"1",
1047-
"--no-catchup",
1047+
"--catchup-delay",
1048+
"0s",
10481049
])
10491050
.assert()
10501051
.success();

0 commit comments

Comments
 (0)