diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6239585..06c7a46 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -78,7 +78,7 @@ jobs: "repo": "${{ github.repository }}", "ref": "${{ github.ref }}", "lang": "rust", - "test_cmd": "cargo install cargo-nextest && cargo nextest run --test integration -j 1" + "test_cmd": "cargo test --test integration -j 1" } ] lint: diff --git a/Cargo.lock b/Cargo.lock index 3e05d4d..7e981a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -174,9 +174,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "aws-lc-rs" -version = "1.15.3" +version = "1.15.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e84ce723ab67259cfeb9877c6a639ee9eb7a27b28123abd71db7f0d5d0cc9d86" +checksum = "7b7b6141e96a8c160799cc2d5adecd5cbbe5054cb8c7c4af53da0f83bb7ad256" dependencies = [ "aws-lc-sys", "zeroize", @@ -184,9 +184,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.36.0" +version = "0.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43a442ece363113bd4bd4c8b18977a7798dd4d3c3383f34fb61936960e8f4ad8" +checksum = "5c34dda4df7017c8db52132f0f8a2e0f8161649d15723ed63fc00c82d0f2081a" dependencies = [ "cc", "cmake", @@ -302,9 +302,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.53" +version = "1.2.54" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "755d2fce177175ffca841e9a06afdb2c4ab0f593d53b4dee48147dfaade85932" +checksum = "6354c81bbfd62d9cfa9cb3c773c2b7b2a3a482d569de977fd0e961f6e7c00583" dependencies = [ "find-msvc-tools", "jobserver", @@ -1514,9 +1514,9 @@ dependencies = [ [[package]] name = "num-conv" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" [[package]] name = "num-traits" @@ -1550,9 +1550,9 @@ checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" [[package]] name = "openssl-probe" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f50d9b3dabb09ecd771ad0aa242ca6894994c130308ca3d7684634df8037391" +checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" [[package]] name = "option-ext" @@ -1796,9 +1796,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.105" +version = "1.0.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "535d180e0ecab6268a3e718bb9fd44db66bbbc256257165fc699dadf70d16fe7" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" dependencies = [ "unicode-ident", ] @@ -1884,9 +1884,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.43" +version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc74d9a594b72ae6656596548f56f667211f8a97b3d4c3d467150794690dc40a" +checksum = "21b2ebcf727b7760c461f091f9f0f539b77b8e87f2fd88131e7f1b433b3cece4" dependencies = [ "proc-macro2", ] @@ -2212,9 +2212,9 @@ checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984" [[package]] name = "s2-api" -version = "0.3.3" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "064db3afa8a9fe9b1f09a9d5ebb6b0bd20ba0fd041e94e50032fcbe57cd9a022" +checksum = "7dd1415b83de9008573bfc3764c73a6e92167f521ca39e389889a22aa251b679" dependencies = [ "base64ct", "bytes", @@ -2277,9 +2277,9 @@ dependencies = [ [[package]] name = "s2-common" -version = "0.3.3" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a5005936767032ff1218f81fd7ad069316e908ca583f73e7534a0d140217a7c" +checksum = "d62d7b095f290ced3bec82814b129efd5fb1d974b950c3ce1e7879e439947d67" dependencies = [ "blake3", "bytes", @@ -2295,9 +2295,9 @@ dependencies = [ [[package]] name = "s2-sdk" -version = "0.22.5" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f89db7b36e8f93215e35daae6f26be50fee154abbf7ed45cef5bc74dd6e50ac" +checksum = "458942269611ac68ecbfb3a30596416b7f58499e51810eeb29f06277a65651fa" dependencies = [ "async-compression", "async-stream", @@ -2561,9 +2561,9 @@ checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" [[package]] name = "socket2" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881" +checksum = "86f4aa3ad99f2088c990dfa82d367e19cb29268ed67c574d10d0a4bfe71f07e0" dependencies = [ "libc", "windows-sys 0.60.2", @@ -2789,9 +2789,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.45" +version = "0.3.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9e442fc33d7fdb45aa9bfeb312c095964abdf596f7567261062b2a7107aaabd" +checksum = "9da98b7d9b7dad93488a84b8248efc35352b0b2657397d4167e7ad67e5d535e5" dependencies = [ "deranged", "itoa", @@ -2804,15 +2804,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b36ee98fd31ec7426d599183e8fe26932a8dc1fb76ddb6214d05493377d34ca" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" [[package]] name = "time-macros" -version = "0.2.25" +version = "0.2.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71e552d1249bf61ac2a52db88179fd0673def1e1ad8243a00d9ec9ed71fee3dd" +checksum = "78cc610bac2dcee56805c99642447d4c5dbde4d01f752ffea0199aee1f601dc4" dependencies = [ "num-conv", "time-core", @@ -3185,9 +3185,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.19.0" +version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" +checksum = "ee48d38b119b0cd71fe4141b30f5ba9c7c5d9f4e7a3a8b4a674e4b6ef789976f" dependencies = [ "getrandom 0.3.4", "js-sys", diff --git a/Cargo.toml b/Cargo.toml index 6c4e5e2..43fa6bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ indicatif = "0.18.3" json_to_table = "0.12.0" miette = { version = "7.6.0", features = ["fancy"] } rand = "0.9.2" -s2-sdk = { version = "0.22.5", features = ["_hidden"] } +s2-sdk = { version = "0.23.0", features = ["_hidden"] } serde = { version = "1.0.228", features = ["derive"] } serde_json = { version = "1.0.149", features = ["preserve_order"] } strum = { version = "0.27", features = ["derive"] } @@ -39,7 +39,7 @@ tokio-stream = { version = "0.1.18", features = ["io-util"] } toml = "0.9.11" tracing = "0.1.44" tracing-subscriber = { version = "0.3.22", features = ["env-filter"] } -uuid = { version = "1.19.0", features = ["v4"] } +uuid = { version = "1.20.0", features = ["v4"] } xxhash-rust = { version = "0.8.15", features = ["xxh3"] } [dev-dependencies] diff --git a/src/bench.rs b/src/bench.rs index b0fbf0b..5618398 100644 --- a/src/bench.rs +++ b/src/bench.rs @@ -16,7 +16,7 @@ use s2_sdk::{ producer::{IndexedAppendAck, ProducerConfig}, types::{ AppendRecord, Header, RECORD_BATCH_MAX, ReadFrom, ReadInput, ReadStart, ReadStop, S2Error, - SequencedRecord, ValidationError, + SequencedRecord, }, }; use tokio::sync::mpsc; @@ -39,7 +39,7 @@ struct BenchWriteSample { records: u64, elapsed: Duration, ack_latencies: Vec, - run_hash: Option, + chain_hash: Option, } struct BenchReadSample { @@ -47,7 +47,7 @@ struct BenchReadSample { records: u64, elapsed: Duration, e2e_latencies: Vec, - run_hash: Option, + chain_hash: Option, } trait BenchSample { @@ -105,11 +105,13 @@ fn record_body(record_size: usize, rng: &mut rand::rngs::StdRng) -> Bytes { Bytes::from(body) } -fn record(body: Bytes, timestamp: u64, hash: u64) -> Result { - let hash_header = Header::new(HASH_HEADER_NAME, hash.to_be_bytes().to_vec()); - let record = AppendRecord::new(body)?; - let record = record.with_headers([hash_header])?; - Ok(record.with_timestamp(timestamp)) +fn new_record(body: Bytes, timestamp: u64, hash: u64) -> AppendRecord { + AppendRecord::new(body) + .and_then(|record| { + record.with_headers([Header::new(HASH_HEADER_NAME, hash.to_be_bytes().to_vec())]) + }) + .expect("valid") + .with_timestamp(timestamp) } fn record_hash(record: &SequencedRecord) -> Result { @@ -142,9 +144,8 @@ fn bench_write( write_done_records: Arc, bench_start: Instant, ) -> impl Stream> + Send { - let metered_size = record(Bytes::from(vec![0u8; body_size(record_size)]), 0, 0) - .expect("valid record") - .metered_bytes(); + let metered_size = + new_record(Bytes::from(vec![0u8; body_size(record_size)]), 0, 0).metered_bytes(); assert_eq!(metered_size, record_size); let producer = stream.producer(ProducerConfig::default()); @@ -158,7 +159,6 @@ fn bench_write( let mut last_yield = Instant::now(); let mut rng = rand::rngs::StdRng::seed_from_u64(0); let mut prev_hash: u64 = 0; - let mut run_hasher = Xxh3Default::new(); let mut next_seq_num: u64 = 0; let mut pending_acks: FuturesUnordered = FuturesUnordered::new(); @@ -208,7 +208,7 @@ fn bench_write( records: total_records, elapsed: throughput_start.elapsed(), ack_latencies: std::mem::take(&mut ack_latencies), - run_hash: None, + chain_hash: None, }); } } @@ -231,18 +231,9 @@ fn bench_write( let body = record_body(record_size, &mut rng); let hash = chain_hash(prev_hash, body.as_ref()); prev_hash = hash; - run_hasher.update(&hash.to_be_bytes()); - let rec = match record(body, timestamp, hash) { - Ok(r) => r, - Err(e) => { - yield Err(CliError::InvalidArgs(miette::miette!( - "Invalid bench record: {e}" - ))); - return; - } - }; + let record = new_record(body, timestamp, hash); pending_acks.push(Box::pin(async move { - let res = permit.submit(rec).await; + let res = permit.submit(record).await; (submit_time, res) })); bytes_submitted += record_size; @@ -262,7 +253,7 @@ fn bench_write( records: total_records, elapsed: throughput_start.elapsed(), ack_latencies, - run_hash: Some(run_hasher.digest()), + chain_hash: Some(prev_hash), }); } } @@ -318,7 +309,6 @@ fn bench_read_inner( let mut last_yield = Instant::now(); let mut e2e_latencies: Vec = Vec::new(); let mut prev_hash: u64 = 0; - let mut run_hasher = Xxh3Default::new(); let mut poll_interval = tokio::time::interval(Duration::from_millis(250)); @@ -356,7 +346,7 @@ fn bench_read_inner( return; } - let expected_hash = match record_hash(record) { + let header_hash = match record_hash(record) { Ok(hash) => hash, Err(err) => { yield Err(CliError::BenchVerification(format!( @@ -366,16 +356,24 @@ fn bench_read_inner( return; } }; - let hash = chain_hash(prev_hash, record.body.as_ref()); - if hash != expected_hash { + + if header_hash == prev_hash { + yield Err(CliError::BenchVerification(format!( + "duplicate record hash at seq_num {}", + record.seq_num + ))); + return; + } + + let computed_hash = chain_hash(prev_hash, record.body.as_ref()); + if computed_hash != header_hash { yield Err(CliError::BenchVerification(format!( "unexpected record hash at seq_num {}", record.seq_num ))); return; } - run_hasher.update(&hash.to_be_bytes()); - prev_hash = hash; + prev_hash = computed_hash; e2e_latencies.push(Duration::from_micros( now_micros.saturating_sub(record.timestamp), )); @@ -391,7 +389,7 @@ fn bench_read_inner( records: total_records, elapsed: throughput_start.elapsed(), e2e_latencies: std::mem::take(&mut e2e_latencies), - run_hash: None, + chain_hash: None, }); } @@ -414,7 +412,7 @@ fn bench_read_inner( records: total_records, elapsed: throughput_start.elapsed(), e2e_latencies, - run_hash: Some(run_hasher.digest()), + chain_hash: Some(prev_hash), }); } } @@ -475,8 +473,8 @@ pub async fn run( let mut read_sample: Option = None; let mut all_ack_latencies: Vec = Vec::new(); let mut all_e2e_latencies: Vec = Vec::new(); - let mut write_run_hash: Option = None; - let mut read_run_hash: Option = None; + let mut write_chain_hash: Option = None; + let mut read_chain_hash: Option = None; let stop = Arc::new(AtomicBool::new(false)); let write_done_records = Arc::new(AtomicU64::new(WRITE_DONE_SENTINEL)); @@ -545,8 +543,8 @@ pub async fn run( Some(BenchEvent::Write(Ok(sample))) => { update_bench_bar(&write_bar, &sample); all_ack_latencies.extend(sample.ack_latencies.iter().copied()); - if let Some(hash) = sample.run_hash { - write_run_hash = Some(hash); + if let Some(hash) = sample.chain_hash { + write_chain_hash = Some(hash); } write_sample = Some(sample); } @@ -564,8 +562,8 @@ pub async fn run( Some(BenchEvent::Read(Ok(sample))) => { update_bench_bar(&read_bar, &sample); all_e2e_latencies.extend(sample.e2e_latencies.iter().copied()); - if let Some(hash) = sample.run_hash { - read_run_hash = Some(hash); + if let Some(hash) = sample.chain_hash { + read_chain_hash = Some(hash); } read_sample = Some(sample); } @@ -626,7 +624,16 @@ pub async fn run( print_latency_stats(LatencyStats::compute(all_e2e_latencies), "End-to-End"); } - if let (Some(expected), Some(actual)) = (write_run_hash, read_run_hash) + if let (Some(write_sample), Some(read_sample)) = (write_sample.as_ref(), read_sample.as_ref()) + && write_sample.records != read_sample.records + { + return Err(CliError::BenchVerification(format!( + "live read record count mismatch: expected {}, got {}", + write_sample.records, read_sample.records + ))); + } + + if let (Some(expected), Some(actual)) = (write_chain_hash, read_chain_hash) && expected != actual { return Err(CliError::BenchVerification(format!( @@ -646,15 +653,15 @@ pub async fn run( .expect("valid template"), ); let mut catchup_sample: Option = None; - let mut catchup_run_hash: Option = None; + let mut catchup_chain_hash: Option = None; let catchup_stream = bench_read_catchup(stream.clone(), record_size, bench_start); let mut catchup_stream = std::pin::pin!(catchup_stream); while let Some(result) = catchup_stream.next().await { match result { Ok(sample) => { update_bench_bar(&catchup_bar, &sample); - if let Some(hash) = sample.run_hash { - catchup_run_hash = Some(hash); + if let Some(hash) = sample.chain_hash { + catchup_chain_hash = Some(hash); } catchup_sample = Some(sample); } @@ -666,7 +673,7 @@ pub async fn run( } catchup_bar.finish_and_clear(); - if let Some(sample) = catchup_sample { + if let Some(sample) = &catchup_sample { eprintln!( "{}: {:.2} MiB/s, {:.0} records/s ({} bytes, {} records in {:.2}s)", "Catchup".bold().cyan(), @@ -683,7 +690,17 @@ pub async fn run( ); } - if let (Some(expected), Some(actual)) = (write_run_hash, catchup_run_hash) + if let (Some(write_sample), Some(catchup_sample)) = + (write_sample.as_ref(), catchup_sample.as_ref()) + && write_sample.records != catchup_sample.records + { + return Err(CliError::BenchVerification(format!( + "catchup read record count mismatch: expected {}, got {}", + write_sample.records, catchup_sample.records + ))); + } + + if let (Some(expected), Some(actual)) = (write_chain_hash, catchup_chain_hash) && expected != actual { return Err(CliError::BenchVerification(format!( diff --git a/src/config.rs b/src/config.rs index 1b22d30..8c3d002 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,7 +3,7 @@ use std::{path::PathBuf, time::Duration}; use config::{Config, FileFormat}; use s2_sdk::{ self as sdk, - types::{S2Config, S2Endpoints}, + types::{AccountEndpoint, BasinEndpoint, S2Config, S2Endpoints}, }; use serde::{Deserialize, Serialize}; @@ -163,13 +163,17 @@ pub fn sdk_config(config: &CliConfig) -> Result { let mut sdk_config = S2Config::new(access_token) .with_user_agent("s2-cli") - .map_err(|e| CliError::EndpointsFromEnv(e.to_string()))? + .expect("valid user agent") .with_request_timeout(Duration::from_secs(30)) .with_compression(compression); match (&config.account_endpoint, &config.basin_endpoint) { - (Some(account), Some(basin)) => { - let endpoints = S2Endpoints::parse_from(account, basin) + (Some(account_endpoint_str), Some(basin_endpoint_str)) => { + let account_endpoint = AccountEndpoint::new(account_endpoint_str) + .map_err(|e| CliError::EndpointsFromEnv(e.to_string()))?; + let basin_endpoint = BasinEndpoint::new(basin_endpoint_str) + .map_err(|e| CliError::EndpointsFromEnv(e.to_string()))?; + let endpoints = S2Endpoints::new(account_endpoint, basin_endpoint) .map_err(|e| CliError::EndpointsFromEnv(e.to_string()))?; sdk_config = sdk_config.with_endpoints(endpoints); } diff --git a/src/main.rs b/src/main.rs index cb5bcab..c7bd43f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -536,23 +536,19 @@ async fn run() -> Result<(), CliError> { args.duration, args.target_mibps, args.record_size, ); - let bench_res = bench::run( + bench::run( basin.stream(stream_name.clone()), args.record_size as usize, args.target_mibps, *args.duration, *args.catchup_delay, ) - .await; - let delete_res = basin + .await?; + + basin .delete_stream(DeleteStreamInput::new(stream_name)) .await - .map_err(|e| CliError::op(OpKind::Bench, e)); - if let Err(err) = bench_res { - let _ = delete_res; - return Err(err); - } - delete_res?; + .map_err(|e| CliError::op(OpKind::Bench, e))?; } } diff --git a/src/ops.rs b/src/ops.rs index abab2f0..8bd03f4 100644 --- a/src/ops.rs +++ b/src/ops.rs @@ -59,7 +59,7 @@ pub async fn list_basins<'a>( Ok(Box::pin(stream::iter(page.values.into_iter().map(Ok)))) } else { - let mut input = ListAllBasinsInput::new().with_ignore_pending_deletions(false); + let mut input = ListAllBasinsInput::new().with_include_deleted(true); if let Some(p) = prefix { input = input.with_prefix(p); } @@ -320,7 +320,7 @@ pub async fn list_streams<'a>( Ok(Box::pin(stream::iter(page.values.into_iter().map(Ok)))) } else { - let mut input = ListAllStreamsInput::new().with_ignore_pending_deletions(false); + let mut input = ListAllStreamsInput::new().with_include_deleted(true); if let Some(p) = prefix { input = input.with_prefix(p); }