diff --git a/Cargo.lock b/Cargo.lock index 05acdb1..a1b8254 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1840,7 +1840,7 @@ dependencies = [ "rustc-hash", "rustls", "socket2", - "thiserror 2.0.17", + "thiserror 2.0.18", "tokio", "tracing", "web-time", @@ -1862,7 +1862,7 @@ dependencies = [ "rustls", "rustls-pki-types", "slab", - "thiserror 2.0.17", + "thiserror 2.0.18", "tinyvec", "tracing", "web-time", @@ -1943,7 +1943,7 @@ checksum = "a4e608c6638b9c18977b00b475ac1f28d14e84b27d8d42f70e0bf1e3dec127ac" dependencies = [ "getrandom 0.2.17", "libredox", - "thiserror 2.0.17", + "thiserror 2.0.18", ] [[package]] @@ -2212,9 +2212,9 @@ checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984" [[package]] name = "s2-api" -version = "0.3.0" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62e7a403287ebcb410dbf38ac17af9efb6454a651a864229dfbba33e4f0efbe3" +checksum = "064db3afa8a9fe9b1f09a9d5ebb6b0bd20ba0fd041e94e50032fcbe57cd9a022" dependencies = [ "base64ct", "bytes", @@ -2230,7 +2230,7 @@ dependencies = [ "serde", "serde_json", "strum", - "thiserror 2.0.17", + "thiserror 2.0.18", "time", "tokio-util", "zstd", @@ -2265,7 +2265,7 @@ dependencies = [ "strum", "tabled", "tempfile", - "thiserror 2.0.17", + "thiserror 2.0.18", "tokio", "tokio-stream", "toml", @@ -2277,9 +2277,9 @@ dependencies = [ [[package]] name = "s2-common" -version = "0.3.0" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b76e12c74370d1614999b6f9862f02d419a8e6eb67aa86ae6dc6a3cbb57ca3fd" +checksum = "9a5005936767032ff1218f81fd7ad069316e908ca583f73e7534a0d140217a7c" dependencies = [ "blake3", "bytes", @@ -2289,7 +2289,7 @@ dependencies = [ "http", "serde", "strum", - "thiserror 2.0.17", + "thiserror 2.0.18", "time", ] @@ -2313,7 +2313,7 @@ dependencies = [ "secrecy", "serde", "serde_json", - "thiserror 2.0.17", + "thiserror 2.0.18", "time", "tokio", "tokio-muxt", @@ -2749,11 +2749,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.17" +version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" +checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ - "thiserror-impl 2.0.17", + "thiserror-impl 2.0.18", ] [[package]] @@ -2769,9 +2769,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.17" +version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" +checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" dependencies = [ "proc-macro2", "quote", @@ -3731,9 +3731,9 @@ dependencies = [ [[package]] name = "zmij" -version = "1.0.15" +version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94f63c051f4fe3c1509da62131a678643c5b6fbdc9273b2b79d4378ebda003d2" +checksum = "dfcd145825aace48cff44a8844de64bf75feec3080e0aa5cdbde72961ae51a65" [[package]] name = "zstd" diff --git a/Cargo.toml b/Cargo.toml index 5d7dc6a..f314000 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,40 +14,40 @@ path = "src/main.rs" [dependencies] async-stream = "0.3.6" -base64ct = { version = "1.7.3", features = ["alloc"] } -bytes = "1.10.1" -clap = { version = "4.5.38", features = ["derive"] } +base64ct = { version = "1.8.3", features = ["alloc"] } +bytes = "1.11.0" +clap = { version = "4.5.54", features = ["derive"] } color-print = "0.3.7" -colored = "3.0.0" -config = "0.15.11" +colored = "3.1.1" +config = "0.15.19" dirs = "6.0.0" futures = "0.3.31" -http = "1.3.1" -humantime = "2.2.0" +http = "1.4.0" +humantime = "2.3.0" indicatif = "0.18.3" json_to_table = "0.12.0" miette = { version = "7.6.0", features = ["fancy"] } -rand = "0.9.1" +rand = "0.9.2" s2-sdk = { version = "0.22.4", features = ["_hidden"] } -serde = { version = "1.0.219", features = ["derive"] } -serde_json = { version = "1.0.140", features = ["preserve_order"] } +serde = { version = "1.0.228", features = ["derive"] } +serde_json = { version = "1.0.149", features = ["preserve_order"] } strum = { version = "0.27", features = ["derive"] } tabled = "0.20.0" -thiserror = "2.0.12" -tokio = { version = "1.45.1", features = ["full"] } -tokio-stream = { version = "0.1.17", features = ["io-util"] } +thiserror = "2.0.18" +tokio = { version = "1.49.0", features = ["full"] } +tokio-stream = { version = "0.1.18", features = ["io-util"] } toml = "0.9.11" -tracing = "0.1.41" -tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } +tracing = "0.1.44" +tracing-subscriber = { version = "0.3.22", features = ["env-filter"] } uuid = { version = "1.19.0", features = ["v4"] } xxhash-rust = { version = "0.8.15", features = ["xxh3"] } [dev-dependencies] -assert_cmd = "2.0" +assert_cmd = "2.1" predicates = "3.1" -rstest = "0.26.0" -serial_test = "3.2" -tempfile = "3.15" +rstest = "0.26.1" +serial_test = "3.3" +tempfile = "3.24" [profile.release] lto = true diff --git a/src/bench.rs b/src/bench.rs index 0a78f0a..b0fbf0b 100644 --- a/src/bench.rs +++ b/src/bench.rs @@ -424,7 +424,7 @@ pub async fn run( record_size: usize, target_mibps: NonZeroU64, duration: Duration, - catchup: bool, + catchup_delay: Duration, ) -> Result<(), CliError> { assert!(record_size <= RECORD_BATCH_MAX.bytes); @@ -634,66 +634,61 @@ pub async fn run( ))); } - if catchup { - eprintln!(); - eprintln!("Waiting 20s before catchup read..."); - tokio::time::sleep(Duration::from_secs(20)).await; - - let catchup_bar = ProgressBar::no_length() - .with_prefix(format!("{:width$}", "catchup", width = prefix_width)) - .with_style( - ProgressStyle::default_bar() - .template("{prefix:.bold.cyan} {msg}") - .expect("valid template"), - ); - let mut catchup_sample: Option = None; - let mut catchup_run_hash: Option = None; - let catchup_stream = bench_read_catchup(stream.clone(), record_size, bench_start); - let mut catchup_stream = std::pin::pin!(catchup_stream); - while let Some(result) = catchup_stream.next().await { - match result { - Ok(sample) => { - update_bench_bar(&catchup_bar, &sample); - if let Some(hash) = sample.run_hash { - catchup_run_hash = Some(hash); - } - catchup_sample = Some(sample); - } - Err(e) => { - catchup_bar.finish_and_clear(); - return Err(e); + eprintln!(); + eprintln!("Waiting {:?} before catchup read...", catchup_delay); + tokio::time::sleep(catchup_delay).await; + + let catchup_bar = ProgressBar::no_length() + .with_prefix(format!("{:width$}", "catchup", width = prefix_width)) + .with_style( + ProgressStyle::default_bar() + .template("{prefix:.bold.cyan} {msg}") + .expect("valid template"), + ); + let mut catchup_sample: Option = None; + let mut catchup_run_hash: Option = None; + let catchup_stream = bench_read_catchup(stream.clone(), record_size, bench_start); + let mut catchup_stream = std::pin::pin!(catchup_stream); + while let Some(result) = catchup_stream.next().await { + match result { + Ok(sample) => { + update_bench_bar(&catchup_bar, &sample); + if let Some(hash) = sample.run_hash { + catchup_run_hash = Some(hash); } + catchup_sample = Some(sample); + } + Err(e) => { + catchup_bar.finish_and_clear(); + return Err(e); } } + } - catchup_bar.finish_and_clear(); - if let Some(sample) = catchup_sample { - eprintln!( - "{}: {:.2} MiB/s, {:.0} records/s ({} bytes, {} records in {:.2}s)", - "Catchup".bold().cyan(), - sample.mib_per_sec(), - sample.records_per_sec(), - sample.bytes, - sample.records, - sample.elapsed.as_secs_f64() - ); - } else { - eprintln!( - "{}: no records available for catchup read", - "Catchup".bold().cyan() - ); - } - - if let (Some(expected), Some(actual)) = (write_run_hash, catchup_run_hash) - && expected != actual - { - return Err(CliError::BenchVerification(format!( - "catchup read hash mismatch: expected {expected}, got {actual}" - ))); - } + catchup_bar.finish_and_clear(); + if let Some(sample) = catchup_sample { + eprintln!( + "{}: {:.2} MiB/s, {:.0} records/s ({} bytes, {} records in {:.2}s)", + "Catchup".bold().cyan(), + sample.mib_per_sec(), + sample.records_per_sec(), + sample.bytes, + sample.records, + sample.elapsed.as_secs_f64() + ); } else { - eprintln!(); - eprintln!("Skipping catchup read."); + eprintln!( + "{}: no records available for catchup read", + "Catchup".bold().cyan() + ); + } + + if let (Some(expected), Some(actual)) = (write_run_hash, catchup_run_hash) + && expected != actual + { + return Err(CliError::BenchVerification(format!( + "catchup read hash mismatch: expected {expected}, got {actual}" + ))); } Ok(()) diff --git a/src/cli.rs b/src/cli.rs index 44df18c..4c9c233 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -531,9 +531,9 @@ pub struct BenchArgs { #[arg(short = 'd', long, default_value = "60s")] pub duration: humantime::Duration, - /// Skip the catchup read after the live run. - #[arg(long, default_value_t = false)] - pub no_catchup: bool, + /// Delay before starting the catchup read. + #[arg(short = 'w', long, default_value = "20s")] + pub catchup_delay: humantime::Duration, } /// Time range args for gauge metrics (no interval). diff --git a/src/main.rs b/src/main.rs index 6b710db..cb5bcab 100644 --- a/src/main.rs +++ b/src/main.rs @@ -541,7 +541,7 @@ async fn run() -> Result<(), CliError> { args.record_size as usize, args.target_mibps, *args.duration, - !args.no_catchup, + *args.catchup_delay, ) .await; let delete_res = basin diff --git a/tests/integration.rs b/tests/integration.rs index b138ffb..c5e8b96 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -1044,7 +1044,8 @@ fn bench_stream() { "1s", "--target-mibps", "1", - "--no-catchup", + "--catchup-delay", + "0s", ]) .assert() .success();