diff --git a/rust/cubestore/.gitignore b/rust/cubestore/.gitignore index 62f3470113011..810464f4b3b51 100644 --- a/rust/cubestore/.gitignore +++ b/rust/cubestore/.gitignore @@ -8,3 +8,4 @@ downloaded cubestore/target cubesql/target cubestore-sql-tests/data/** +cubestore/db-tmp \ No newline at end of file diff --git a/rust/cubestore/Cargo.lock b/rust/cubestore/Cargo.lock index 678ac9caab9ad..28a2a62d93d2f 100644 --- a/rust/cubestore/Cargo.lock +++ b/rust/cubestore/Cargo.lock @@ -139,6 +139,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "anstyle" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "862ed96ca487e809f1c8e5a8447f6ee2cf102f846893800b20cebdf541fc6bbd" + [[package]] name = "anyhow" version = "1.0.42" @@ -320,7 +326,7 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ - "hermit-abi", + "hermit-abi 0.1.19", "libc", "winapi 0.3.9", ] @@ -754,32 +760,36 @@ dependencies = [ "atty", "bitflags 1.3.2", "strsim", - "textwrap 0.11.0", + "textwrap", "unicode-width", "vec_map", ] [[package]] name = "clap" -version = "3.2.23" +version = "4.5.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71655c45cb9845d3270c9d6df84ebe72b4dad3c2ba3f7023ad47c144e4e473a5" +checksum = "40b6887a1d8685cebccf115538db5c0efe625ccac9696ad45c409d96566e910f" dependencies = [ - "bitflags 1.3.2", - "clap_lex", - "indexmap 1.7.0", - "textwrap 0.16.0", + "clap_builder", ] [[package]] -name = "clap_lex" -version = "0.2.4" +name = "clap_builder" +version = "4.5.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5" +checksum = "e0c66c08ce9f0c698cbce5c0279d0bb6ac936d8674174fe48f736533b964f59e" dependencies = [ - "os_str_bytes", + "anstyle", + "clap_lex", ] +[[package]] +name = "clap_lex" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675" + [[package]] name = "cloud-storage" version = "0.7.1" @@ -925,20 +935,20 @@ dependencies = [ [[package]] name = "criterion" -version = "0.4.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7c76e09c1aae2bc52b3d2f29e13c6572553b30c4aa1b8a49fd70de6412654cb" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" dependencies = [ "anes", - "atty", "cast", "ciborium", - "clap 3.2.23", + "clap 4.5.40", "criterion-plot", "futures", + "is-terminal", "itertools 0.10.1", - "lazy_static", "num-traits 0.2.14", + "once_cell", "oorandom", "plotters", "rayon", @@ -1190,12 +1200,12 @@ dependencies = [ "base64 0.13.0", "bigdecimal 0.2.0", "bincode", - "bumpalo", "byteorder", "bytes 1.6.0", "chrono", "chrono-tz", "cloud-storage", + "criterion", "csv", "ctor", "cubedatasketches", @@ -1225,6 +1235,7 @@ dependencies = [ "libc", "log", "lru", + "md5 0.8.0", "memchr", "mockall", "moka 0.10.1", @@ -1281,12 +1292,9 @@ dependencies = [ "cuberockstore", "cubestore", "flate2", - "futures", - "futures-timer 3.0.2", "indoc", "ipc-channel", "itertools 0.9.0", - "lazy_static", "log", "pretty_assertions", "reqwest 0.12.5", @@ -2055,6 +2063,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "hex" version = "0.4.3" @@ -2363,6 +2377,17 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68f2d64f2edebec4ce84ad108148e67e1064789bee435edc5b60ad398714a3a9" +[[package]] +name = "is-terminal" +version = "0.4.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9" +dependencies = [ + "hermit-abi 0.5.2", + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "itertools" version = "0.9.0" @@ -2390,6 +2415,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "0.4.7" @@ -2657,6 +2691,12 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" +[[package]] +name = "md5" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae960838283323069879657ca3de837e9f7bbb4c7bf6ea7f1b290d5e9476d2e0" + [[package]] name = "memchr" version = "2.4.0" @@ -3153,7 +3193,7 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" dependencies = [ - "hermit-abi", + "hermit-abi 0.1.19", "libc", ] @@ -3406,12 +3446,6 @@ dependencies = [ "windows-sys 0.42.0", ] -[[package]] -name = "os_str_bytes" -version = "6.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee" - [[package]] name = "output_vt100" version = "0.1.2" @@ -3619,9 +3653,9 @@ checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c" [[package]] name = "plotters" -version = "0.3.1" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32a3fd9ec30b9749ce28cd91f255d569591cdf937fe280c312143e3c4bad6f2a" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" dependencies = [ "num-traits 0.2.14", "plotters-backend", @@ -3632,15 +3666,15 @@ dependencies = [ [[package]] name = "plotters-backend" -version = "0.3.2" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d88417318da0eaf0fdcdb51a0ee6c3bed624333bff8f946733049380be67ac1c" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" [[package]] name = "plotters-svg" -version = "0.3.1" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "521fa9638fa597e1dc53e9412a4f9cefb01187ee1f7413076f9e6749e2885ba9" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" dependencies = [ "plotters-backend", ] @@ -3789,7 +3823,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.13.0", "proc-macro2", "quote", "syn 2.0.58", @@ -4343,7 +4377,7 @@ dependencies = [ "http 0.2.12", "log", "maybe-async", - "md5", + "md5 0.7.0", "percent-encoding", "reqwest 0.11.27", "serde", @@ -5061,12 +5095,6 @@ dependencies = [ "unicode-width", ] -[[package]] -name = "textwrap" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" - [[package]] name = "thin-dst" version = "1.1.0" diff --git a/rust/cubestore/Dockerfile b/rust/cubestore/Dockerfile index db2ec1ec76cd1..482ebe30d8dc4 100644 --- a/rust/cubestore/Dockerfile +++ b/rust/cubestore/Dockerfile @@ -12,6 +12,7 @@ COPY cubestore/cubezetasketch cubezetasketch COPY cubestore/cubedatasketches cubedatasketches COPY cubestore/cuberpc cuberpc COPY cubestore/cubestore-sql-tests cubestore-sql-tests +COPY cubestore/cubestore/benches cubestore/benches COPY cubestore/cubestore/Cargo.toml cubestore/Cargo.toml RUN mkdir -p cubestore/src/bin && \ echo "fn main() {print!(\"Dummy main\");} // dummy file" > cubestore/src/bin/cubestored.rs diff --git a/rust/cubestore/cubestore-sql-tests/Cargo.toml b/rust/cubestore/cubestore-sql-tests/Cargo.toml index 05873bc2d2056..e2b9e69c244a7 100644 --- a/rust/cubestore/cubestore-sql-tests/Cargo.toml +++ b/rust/cubestore/cubestore-sql-tests/Cargo.toml @@ -37,9 +37,7 @@ async-compression = { version = "0.3.7", features = ["gzip", "tokio"] } async-trait = "0.1.36" cubestore = { path = "../cubestore" } flate2 = "1.0.22" -futures = "0.3.5" itertools = "0.9.0" -lazy_static = "1.4.0" log = "0.4.11" pretty_assertions = "0.7.1" reqwest = { version = "0.12.5", features = ["json", "rustls-tls", "stream", "http2"], default-features = false } @@ -47,13 +45,12 @@ scopeguard = "1.1.0" serde = "1.0.115" serde_derive = "1.0.115" tokio = { version = "1", features = ["full", "rt"] } -futures-timer = "3.0.2" indoc = "1.0" tempfile = "3.2.0" tar = "0.4.38" [dev-dependencies] -criterion = { version = "0.4.0", features = ["async_tokio", "html_reports"] } +criterion = { version = "0.5.1", features = ["async_tokio", "html_reports"] } cuberockstore = { path = "../cuberockstore" } [[bench]] diff --git a/rust/cubestore/cubestore/Cargo.toml b/rust/cubestore/cubestore/Cargo.toml index 33f824ba84315..3efdf1813914d 100644 --- a/rust/cubestore/cubestore/Cargo.toml +++ b/rust/cubestore/cubestore/Cargo.toml @@ -15,7 +15,6 @@ libc = { version = "0.2.97", optional = true } [dependencies] base64 = "0.13.0" -bumpalo = "3.6.1" tokio = { version = "1", features = ["full", "rt"] } warp = { version = "0.3.6" } sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "4388f6712dae5073c2d71d74f64cae2edd418066" } @@ -120,6 +119,12 @@ sasl2-sys = { version = "0.1.6", features = ["vendored"] } [dev-dependencies] pretty_assertions = "0.7.1" +criterion = { version = "0.5.1", features = ["async_tokio", "html_reports"] } +md5 = "0.8.0" + +[[bench]] +name = "cachestore_queue" +harness = false [features] # When enabled, child processes will die whenever parent process exits. diff --git a/rust/cubestore/cubestore/benches/cachestore_queue.rs b/rust/cubestore/cubestore/benches/cachestore_queue.rs new file mode 100644 index 0000000000000..8dccaf6be74bb --- /dev/null +++ b/rust/cubestore/cubestore/benches/cachestore_queue.rs @@ -0,0 +1,204 @@ +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use cubestore::cachestore::{ + CacheStore, QueueAddPayload, QueueItemStatus, QueueKey, RocksCacheStore, +}; +use cubestore::config::{Config, CubeServices}; +use cubestore::CubeError; +use std::sync::Arc; +use tokio::runtime::{Builder, Runtime}; + +fn prepare_cachestore(name: &str) -> Result, CubeError> { + let config = Config::test(&name).update_config(|mut config| { + // disable periodic eviction + config.cachestore_cache_eviction_loop_interval = 100000; + + config + }); + + let (_, cachestore) = RocksCacheStore::prepare_bench_cachestore(&name, config); + + let cachestore_to_move = cachestore.clone(); + + tokio::task::spawn(async move { + let loops = cachestore_to_move.spawn_processing_loops(); + CubeServices::wait_loops(loops).await + }); + + Ok(cachestore) +} + +fn generate_queue_path(queue_path: &str, queue_id: usize) -> String { + format!( + "{}:{}", + queue_path, + format!("{:x}", md5::compute(queue_id.to_be_bytes())) + ) +} + +async fn do_insert( + cachestore: &Arc, + total: usize, + size_kb: usize, + queue_path: &str, + insert_id_padding: usize, +) { + for i in 0..total { + let fut = cachestore.queue_add(QueueAddPayload { + path: generate_queue_path(queue_path, i + insert_id_padding), + value: "a".repeat(size_kb * 1024), // size in bytes + priority: 0, + orphaned: None, + }); + + let res = fut.await; + assert!(res.is_ok()); + } +} + +fn do_insert_bench(c: &mut Criterion, runtime: &Runtime, total: usize, size_kb: usize) { + let cachestore = runtime.block_on(async { + prepare_cachestore(&format!("cachestore_queue_add_{}", size_kb)).unwrap() + }); + + c.bench_with_input( + BenchmarkId::new(format!("queue_add queues:1, size:{} kb", size_kb), total), + &(total, size_kb), + |b, (total, size_kb)| { + let mut insert_id_padding = 0; + + b.to_async(runtime).iter(|| { + let prev_value = insert_id_padding.clone(); + insert_id_padding += total; + + do_insert( + &cachestore, + *total, + *size_kb, + &"STANDALONE#queue", + prev_value, + ) + }); + }, + ); +} + +async fn do_list( + cachestore: &Arc, + status_filter: Option, + total: usize, +) { + for _ in 0..total { + let fut = cachestore.queue_list( + "STANDALONE#queue:1".to_string(), + status_filter.clone(), + true, + false, + ); + + let res = fut.await; + assert!(res.is_ok()); + } +} + +fn do_list_bench( + c: &mut Criterion, + runtime: &Runtime, + status_filter: Option, + per_queue: usize, + size_kb: usize, + total: usize, +) { + let cachestore = runtime.block_on(async { + let cachestore = prepare_cachestore(&format!( + "cachestore_queue_list_{}_{}", + format!("{:?}", status_filter).to_ascii_lowercase(), + size_kb + )) + .unwrap(); + + do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue", 0).await; + do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue", 0).await; + do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue", 0).await; + do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue", 0).await; + do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue", 0).await; + + cachestore + }); + + c.bench_with_input( + BenchmarkId::new( + format!( + "queue_list status_filter: {:?} queues:5, size:{} kb, per_queue:{}", + status_filter, size_kb, per_queue + ), + total, + ), + &(status_filter, total), + |b, (status_filter, total)| { + b.to_async(runtime) + .iter(|| do_list(&cachestore, status_filter.clone(), *total)); + }, + ); +} + +async fn do_get(cachestore: &Arc, total: usize) { + for i in 0..total { + let fut = cachestore.queue_get(QueueKey::ByPath(generate_queue_path( + "STANDALONE#queue", + i + ((i - 1) * 5), + ))); + + let res = fut.await; + assert!(res.is_ok()); + } +} + +fn do_get_bench( + c: &mut Criterion, + runtime: &Runtime, + per_queue: usize, + size_kb: usize, + total: usize, +) { + let cachestore = runtime.block_on(async { + let cachestore = prepare_cachestore(&format!("cachestore_queue_get_{}", size_kb)).unwrap(); + + do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue", 0).await; + do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue", 0).await; + do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue", 0).await; + do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue", 0).await; + do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue", 0).await; + + cachestore + }); + + c.bench_with_input( + BenchmarkId::new( + format!( + "queue_get queues:5, size:{} kb, per_queue:{}", + size_kb, per_queue + ), + total, + ), + &total, + |b, total| { + b.to_async(runtime).iter(|| do_get(&cachestore, *total)); + }, + ); +} + +fn do_benches(c: &mut Criterion) { + let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); + + do_insert_bench(c, &runtime, 512, 64); + do_insert_bench(c, &runtime, 512, 256); + do_insert_bench(c, &runtime, 512, 512); + + do_list_bench(c, &runtime, Some(QueueItemStatus::Pending), 1_000, 128, 128); + do_list_bench(c, &runtime, Some(QueueItemStatus::Active), 1_000, 128, 128); + + do_get_bench(c, &runtime, 10_000, 128, 128); +} + +criterion_group!(benches, do_benches); +criterion_main!(benches); diff --git a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs index 8b543ee0acc1e..c7ffc32a1a98c 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs @@ -389,13 +389,29 @@ impl RocksCacheStore { self.store.add_listener(listener).await; } + pub fn prepare_bench_cachestore( + test_name: &str, + config: Config, + ) -> (Arc, Arc) { + let store_path = env::current_dir() + .unwrap() + .join("db-tmp") + .join("benchmarks") + .join(format!("{}", test_name)); + let _ = std::fs::remove_dir_all(store_path.clone()); + + Self::prepare_test_cachestore_impl(test_name, store_path, config) + } + pub fn prepare_test_cachestore( test_name: &str, config: Config, ) -> (Arc, Arc) { let store_path = env::current_dir() .unwrap() - .join(format!("test-{}-local", test_name)); + .join("db-tmp") + .join("tests") + .join(format!("{}-local", test_name)); let _ = std::fs::remove_dir_all(store_path.clone()); Self::prepare_test_cachestore_impl(test_name, store_path, config) @@ -1296,7 +1312,7 @@ impl CacheStore for RocksCacheStore { .map(|item| item.into_row().key) .collect(); if active.len() >= (allow_concurrency as usize) { - return Ok(QueueRetrieveResponse::NotFound { pending, active }); + return Ok(QueueRetrieveResponse::NotEnoughConcurrency { pending, active }); } let id_row = queue_schema.get_single_opt_row_by_index( @@ -1312,7 +1328,7 @@ impl CacheStore for RocksCacheStore { if id_row.get_row().get_status() == &QueueItemStatus::Pending { let mut new = id_row.get_row().clone(); new.status = QueueItemStatus::Active; - // It's an important to insert heartbeat, because + // It's important to insert heartbeat, because // without that created datetime will be used for orphaned filtering new.update_heartbeat(); @@ -1409,8 +1425,8 @@ impl CacheStore for RocksCacheStore { key: QueueKey, timeout: u64, ) -> Result, CubeError> { - // It's an important to open listener at the beginning to protect race condition - // it will fix position (subscribe) of broadcast channel + // It's important to open listener at the beginning to protect race condition + // it will fix the position (subscribe) of a broadcast channel let listener = self.get_listener().await; let store_in_result = self.lookup_queue_result_by_key(key.clone()).await?;