diff --git a/Cargo.lock b/Cargo.lock index 1243e4eed5f..c486f367a09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -476,6 +476,7 @@ dependencies = [ "arrow-array", "arrow-schema", "arrow-select", + "async-trait", "bytes", "bzip2", "clap", @@ -485,6 +486,7 @@ dependencies = [ "datafusion-physical-plan", "enum-iterator", "futures", + "governor", "homedir", "humansize", "indicatif", @@ -494,6 +496,7 @@ dependencies = [ "object_store", "parquet", "rand", + "rand_distr", "rayon", "regex", "reqwest", @@ -2038,6 +2041,27 @@ dependencies = [ "yansi", ] +[[package]] +name = "governor" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "842dc78579ce01e6a1576ad896edc92fca002dd60c9c3746b7fc2bec6fb429d0" +dependencies = [ + "cfg-if", + "dashmap", + "futures-sink", + "futures-timer", + "futures-util", + "no-std-compat", + "nonzero_ext", + "parking_lot", + "portable-atomic", + "quanta", + "rand", + "smallvec", + "spinning_top", +] + [[package]] name = "h2" version = "0.4.7" @@ -2883,6 +2907,18 @@ dependencies = [ "libc", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -3615,6 +3651,21 @@ dependencies = [ "vortex", ] +[[package]] +name = "quanta" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bd1fe6824cea6538803de3ff1bc0cf3949024db3d43c9643024bfb33a807c0e" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.37.2" @@ -3725,6 +3776,25 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rand_distr" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32cb0b9bc82b0a0876c2dd994a7e7a2683d3e7390ca40e6886785ef0c7e3ee31" +dependencies = [ + "num-traits", + "rand", +] + +[[package]] +name = "raw-cpuid" +version = "11.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6928fa44c097620b706542d428957635951bade7143269085389d42c8a4927e" +dependencies = [ + "bitflags 2.8.0", +] + [[package]] name = "rayon" version = "1.10.0" @@ -4329,6 +4399,15 @@ dependencies = [ "lock_api", ] +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "sqlparser" version = "0.53.0" diff --git a/Cargo.toml b/Cargo.toml index 9445b993744..073e2c6ab05 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,6 +101,7 @@ libfuzzer-sys = "0.4" log = "0.4.21" mimalloc = "0.1.42" moka = "0.12" +governor = "0.8" num-traits = "0.2.18" num_enum = "0.7.2" object_store = "0.11.0" @@ -116,6 +117,7 @@ pyo3 = { version = ">= 0.22", features = ["extension-module", "abi3-py310"] } pyo3-log = ">= 0.11" rancor = "0.1.0" rand = "0.8.5" +rand_distr = "0.4" rayon = "1.10.0" regex = "1.11.0" reqwest = { version = "0.12.0", features = ["blocking"] } diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index 56923e9c110..0aa9b460ba4 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -26,6 +26,7 @@ anyhow = { workspace = true } arrow-array = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } +async-trait = { workspace = true } bytes = { workspace = true } bzip2 = { workspace = true } clap = { workspace = true, features = ["derive"] } @@ -37,6 +38,7 @@ datafusion-common = { workspace = true } datafusion-physical-plan = { workspace = true } enum-iterator = { workspace = true } futures = { workspace = true, features = ["executor"] } +governor = { workspace = true } homedir = { workspace = true } humansize = { workspace = true } indicatif = { workspace = true } @@ -46,6 +48,7 @@ mimalloc = { workspace = true } object_store = { workspace = true, features = ["aws"] } parquet = { workspace = true, features = ["async"] } rand = { workspace = true } +rand_distr = { workspace = true } rayon = { workspace = true } regex = { workspace = true } reqwest = { workspace = true } diff --git a/bench-vortex/benches/clickbench.rs b/bench-vortex/benches/clickbench.rs index 88022ef88c3..dcdfc8d991b 100644 --- a/bench-vortex/benches/clickbench.rs +++ b/bench-vortex/benches/clickbench.rs @@ -41,7 +41,7 @@ fn benchmark(c: &mut Criterion) { .unwrap(); } - let session_context = get_session_with_cache(); + let session_context = get_session_with_cache(false); let context = session_context.clone(); runtime.block_on(async move { diff --git a/bench-vortex/benches/tpch.rs b/bench-vortex/benches/tpch.rs index b39c0538a34..d04b0648aba 100644 --- a/bench-vortex/benches/tpch.rs +++ b/bench-vortex/benches/tpch.rs @@ -20,13 +20,14 @@ fn benchmark(c: &mut Criterion) { Format::InMemoryVortex { enable_pushdown: true, }, + false, )) .unwrap(); let arrow_ctx = runtime - .block_on(load_datasets(&data_dir, Format::Arrow)) + .block_on(load_datasets(&data_dir, Format::Arrow, false)) .unwrap(); let parquet_ctx = runtime - .block_on(load_datasets(&data_dir, Format::Parquet)) + .block_on(load_datasets(&data_dir, Format::Parquet, false)) .unwrap(); let vortex_compressed_ctx = runtime .block_on(load_datasets( @@ -34,6 +35,7 @@ fn benchmark(c: &mut Criterion) { Format::OnDiskVortex { enable_compression: true, }, + false, )) .unwrap(); diff --git a/bench-vortex/src/bin/clickbench.rs b/bench-vortex/src/bin/clickbench.rs index 5e539fc8315..85128795a4d 100644 --- a/bench-vortex/src/bin/clickbench.rs +++ b/bench-vortex/src/bin/clickbench.rs @@ -37,6 +37,8 @@ struct Args { queries: Option>, #[arg(long, default_value = "false")] emit_plan: bool, + #[arg(long, default_value = "false")] + emulate_object_store: bool, } fn main() { @@ -120,7 +122,7 @@ fn main() { let mut all_measurements = Vec::default(); for format in &formats { - let session_context = get_session_with_cache(); + let session_context = get_session_with_cache(args.emulate_object_store); let context = session_context.clone(); match format { Format::Parquet => runtime.block_on(async { diff --git a/bench-vortex/src/bin/tpch_benchmark.rs b/bench-vortex/src/bin/tpch_benchmark.rs index 763559b3ece..e2a7a5465e6 100644 --- a/bench-vortex/src/bin/tpch_benchmark.rs +++ b/bench-vortex/src/bin/tpch_benchmark.rs @@ -33,6 +33,8 @@ struct Args { verbose: bool, #[arg(short, long, default_value_t, value_enum)] display_format: DisplayFormat, + #[arg(long, default_value = "false")] + emulate_object_store: bool, } fn main() -> ExitCode { @@ -62,6 +64,7 @@ fn main() -> ExitCode { args.warmup, args.only_vortex, args.display_format, + args.emulate_object_store, )) } @@ -72,6 +75,7 @@ async fn bench_main( warmup: bool, only_vortex: bool, display_format: DisplayFormat, + emulate_object_store: bool, ) -> ExitCode { // uncomment the below to enable trace logging of datafusion execution // setup_logger(LevelFilter::Trace); @@ -101,7 +105,7 @@ async fn bench_main( let ctxs = try_join_all( formats .iter() - .map(|format| load_datasets(&data_dir, *format)), + .map(|format| load_datasets(&data_dir, *format, emulate_object_store)), ) .await .unwrap(); diff --git a/bench-vortex/src/blob.rs b/bench-vortex/src/blob.rs new file mode 100644 index 00000000000..53e5c2a6f58 --- /dev/null +++ b/bench-vortex/src/blob.rs @@ -0,0 +1,150 @@ +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use datafusion::execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry}; +use futures::stream::BoxStream; +use governor::{DefaultDirectRateLimiter, Quota}; +use object_store::path::Path; +use object_store::{ + GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, + PutOptions, PutPayload, PutResult, Result as OSResult, +}; +use rand::prelude::Distribution as _; +use rand::thread_rng; +use rand_distr::LogNormal; +use reqwest::Url; + +#[derive(Debug)] +pub struct SlowObjectStore { + inner: Arc, + distribution: LogNormal, + rate_limiter: Arc, +} + +#[derive(Debug)] +pub struct SlowObjectStoreRegistry { + pub inner: Arc, +} + +impl Default for SlowObjectStoreRegistry { + fn default() -> Self { + Self { + inner: Arc::new(DefaultObjectStoreRegistry::default()) as _, + } + } +} + +impl ObjectStoreRegistry for SlowObjectStoreRegistry { + fn register_store( + &self, + url: &Url, + store: Arc, + ) -> Option> { + self.inner.register_store(url, store) + } + + fn get_store(&self, url: &Url) -> datafusion_common::Result> { + let store = self.inner.get_store(url)?; + Ok(Arc::new(SlowObjectStore::new(store))) + } +} + +impl SlowObjectStore { + pub fn new(object_store: Arc) -> Self { + Self { + inner: object_store, + distribution: LogNormal::new(4.7, 0.5).unwrap(), //p50 ~ 100, p95 ~ 250 and p100 ~ 600 + rate_limiter: Arc::new(DefaultDirectRateLimiter::direct(Quota::per_second( + (2_u32 << 30).try_into().unwrap(), // 1GB/s + ))), + } + } + + fn wait_time(&self) -> Duration { + let duration = (self.distribution.sample(&mut thread_rng()) as u64).clamp(30, 1_000); + Duration::from_millis(duration) + } + + /// Injects an artificial sleep of somewhere between 30ms to a full second. + // wait times will p50 ~ 100ms, p95 ~ 250ms and p100 ~ 600ms. + /// + /// We always wait at least 30ms, which seems to be the rough baseline for object store access. + async fn wait(&self) { + tokio::time::sleep(self.wait_time()).await; + } + + /// Same as `Self::wait`, but with additional wait time according to the size of the response. + async fn wait_with_size(&self, size: usize) { + let base_wait_time = self.wait_time(); + let additional_ms = size.div_ceil(65536) as u64; // 64KB, roughly median throughput on S3 + let total_time = base_wait_time + Duration::from_millis(additional_ms); + tokio::time::sleep(total_time).await; + } +} + +impl std::fmt::Display for SlowObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "SlowObjectStore({})", self.inner) + } +} + +#[async_trait] +impl ObjectStore for SlowObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> OSResult { + self.wait().await; + self.inner.put_opts(location, payload, opts).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOpts, + ) -> OSResult> { + self.wait().await; + self.inner.put_multipart_opts(location, opts).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult { + // Ideally, we would tune `wait` here for the actual if it exists in options.range + let r = self.inner.get_opts(location, options).await?; + + self.wait_with_size(r.meta.size).await; + self.rate_limiter + .until_n_ready((r.meta.size as u32).try_into().unwrap()) + .await + .unwrap(); + + Ok(r) + } + + async fn delete(&self, location: &Path) -> OSResult<()> { + self.wait().await; + self.inner.delete(location).await + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, OSResult> { + self.inner.list(prefix) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult { + // This just makes listing super slow and its not really the part we're interested in + // self.wait().await; + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> { + self.wait().await; + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> { + self.wait().await; + self.copy_if_not_exists(from, to).await + } +} diff --git a/bench-vortex/src/clickbench.rs b/bench-vortex/src/clickbench.rs index 16660024853..2d082746037 100644 --- a/bench-vortex/src/clickbench.rs +++ b/bench-vortex/src/clickbench.rs @@ -229,7 +229,7 @@ pub async fn register_vortex_files( .expect("Failed to write Vortex file") }) }) - .buffered(16) + .buffer_unordered(16) .try_collect::>() .await?; diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index e8d057f60e8..60a7b58a827 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -9,8 +9,10 @@ use std::sync::{Arc, LazyLock}; use std::time::Duration; use arrow_array::{RecordBatch, RecordBatchReader}; +use blob::SlowObjectStoreRegistry; use datafusion::execution::cache::cache_manager::CacheManagerConfig; use datafusion::execution::cache::cache_unit::{DefaultFileStatisticsCache, DefaultListFilesCache}; +use datafusion::execution::object_store::DefaultObjectStoreRegistry; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_physical_plan::{collect, ExecutionPlan}; @@ -31,6 +33,7 @@ use crate::data_downloads::FileType; use crate::reader::BATCH_SIZE; use crate::taxi_data::taxi_data_parquet; +pub mod blob; pub mod clickbench; pub mod data_downloads; pub mod display; @@ -314,17 +317,23 @@ impl Measurement { } } -pub fn get_session_with_cache() -> SessionContext { - let cache_config = CacheManagerConfig::default(); +pub fn get_session_with_cache(emulate_object_store: bool) -> SessionContext { + let registry = if emulate_object_store { + Arc::new(SlowObjectStoreRegistry::default()) as _ + } else { + Arc::new(DefaultObjectStoreRegistry::new()) as _ + }; + let file_static_cache = Arc::new(DefaultFileStatisticsCache::default()); let list_file_cache = Arc::new(DefaultListFilesCache::default()); - let cache_config = cache_config + let cache_config = CacheManagerConfig::default() .with_files_statistics_cache(Some(file_static_cache)) .with_list_files_cache(Some(list_file_cache)); let rt = RuntimeEnvBuilder::new() .with_cache_manager(cache_config) + .with_object_store_registry(registry) .build_arc() .expect("could not build runtime environment"); diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index 954b1a79091..c0eae9a036f 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -24,7 +24,9 @@ use vortex_datafusion::memory::VortexMemTableOptions; use vortex_datafusion::persistent::VortexFormat; use vortex_datafusion::SessionContextExt; -use crate::{idempotent_async, Format, CTX, TARGET_BLOCK_BYTESIZE, TARGET_BLOCK_SIZE}; +use crate::{ + get_session_with_cache, idempotent_async, Format, CTX, TARGET_BLOCK_BYTESIZE, TARGET_BLOCK_SIZE, +}; pub mod dbgen; mod execute; @@ -40,8 +42,9 @@ pub const EXPECTED_ROW_COUNTS: [usize; 23] = [ pub async fn load_datasets>( base_dir: P, format: Format, + emulate_object_store: bool, ) -> anyhow::Result { - let context = SessionContext::new(); + let context = get_session_with_cache(emulate_object_store); let base_dir = base_dir.as_ref(); let customer = base_dir.join("customer.tbl"); diff --git a/encodings/runend/Cargo.toml b/encodings/runend/Cargo.toml index e08d80cfba4..f9a14433c61 100644 --- a/encodings/runend/Cargo.toml +++ b/encodings/runend/Cargo.toml @@ -24,7 +24,6 @@ vortex-dtype = { workspace = true } vortex-error = { workspace = true } vortex-scalar = { workspace = true } - [lints] workspace = true diff --git a/fuzz/Cargo.toml b/fuzz/Cargo.toml index e0f8996c589..574caa0149e 100644 --- a/fuzz/Cargo.toml +++ b/fuzz/Cargo.toml @@ -18,11 +18,11 @@ readme = "README.md" cargo-fuzz = true [dependencies] -libfuzzer-sys = { workspace = true } -arrow-ord = { workspace = true } arrow-buffer = { workspace = true } +arrow-ord = { workspace = true } bytes = { workspace = true } futures-util = { workspace = true } +libfuzzer-sys = { workspace = true } tokio = { workspace = true, features = ["full"] } vortex-array = { workspace = true, features = ["arbitrary"] } vortex-buffer = { workspace = true } diff --git a/vortex-expr/Cargo.toml b/vortex-expr/Cargo.toml index 5a459c2d83a..196636598db 100644 --- a/vortex-expr/Cargo.toml +++ b/vortex-expr/Cargo.toml @@ -38,7 +38,6 @@ vortex-scalar = { workspace = true } [dev-dependencies] vortex-expr = { path = ".", features = ["test-harness"] } - [features] datafusion = [ "dep:datafusion-expr", diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index a02d2d982d2..c490fa1966e 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -46,8 +46,8 @@ vortex-scan = { workspace = true } arrow-schema = { workspace = true } rstest = { workspace = true } tokio = { workspace = true, features = ["full"] } -vortex-io = { path = "../vortex-io", features = ["tokio"] } vortex-array = { workspace = true, features = ["test-harness"] } +vortex-io = { path = "../vortex-io", features = ["tokio"] } [lints] workspace = true