Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ libfuzzer-sys = "0.4"
log = "0.4.21"
mimalloc = "0.1.42"
moka = "0.12"
governor = "0.8"
num-traits = "0.2.18"
num_enum = "0.7.2"
object_store = "0.11.0"
Expand All @@ -116,6 +117,7 @@ pyo3 = { version = ">= 0.22", features = ["extension-module", "abi3-py310"] }
pyo3-log = ">= 0.11"
rancor = "0.1.0"
rand = "0.8.5"
rand_distr = "0.4"
rayon = "1.10.0"
regex = "1.11.0"
reqwest = { version = "0.12.0", features = ["blocking"] }
Expand Down
3 changes: 3 additions & 0 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ anyhow = { workspace = true }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
bzip2 = { workspace = true }
clap = { workspace = true, features = ["derive"] }
Expand All @@ -37,6 +38,7 @@ datafusion-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
enum-iterator = { workspace = true }
futures = { workspace = true, features = ["executor"] }
governor = { workspace = true }
homedir = { workspace = true }
humansize = { workspace = true }
indicatif = { workspace = true }
Expand All @@ -46,6 +48,7 @@ mimalloc = { workspace = true }
object_store = { workspace = true, features = ["aws"] }
parquet = { workspace = true, features = ["async"] }
rand = { workspace = true }
rand_distr = { workspace = true }
rayon = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/benches/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn benchmark(c: &mut Criterion) {
.unwrap();
}

let session_context = get_session_with_cache();
let session_context = get_session_with_cache(false);
let context = session_context.clone();

runtime.block_on(async move {
Expand Down
6 changes: 4 additions & 2 deletions bench-vortex/benches/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,22 @@ fn benchmark(c: &mut Criterion) {
Format::InMemoryVortex {
enable_pushdown: true,
},
false,
))
.unwrap();
let arrow_ctx = runtime
.block_on(load_datasets(&data_dir, Format::Arrow))
.block_on(load_datasets(&data_dir, Format::Arrow, false))
.unwrap();
let parquet_ctx = runtime
.block_on(load_datasets(&data_dir, Format::Parquet))
.block_on(load_datasets(&data_dir, Format::Parquet, false))
.unwrap();
let vortex_compressed_ctx = runtime
.block_on(load_datasets(
&data_dir,
Format::OnDiskVortex {
enable_compression: true,
},
false,
))
.unwrap();

Expand Down
4 changes: 3 additions & 1 deletion bench-vortex/src/bin/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ struct Args {
queries: Option<Vec<usize>>,
#[arg(long, default_value = "false")]
emit_plan: bool,
#[arg(long, default_value = "false")]
emulate_object_store: bool,
}

fn main() {
Expand Down Expand Up @@ -120,7 +122,7 @@ fn main() {
let mut all_measurements = Vec::default();

for format in &formats {
let session_context = get_session_with_cache();
let session_context = get_session_with_cache(args.emulate_object_store);
let context = session_context.clone();
match format {
Format::Parquet => runtime.block_on(async {
Expand Down
6 changes: 5 additions & 1 deletion bench-vortex/src/bin/tpch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ struct Args {
verbose: bool,
#[arg(short, long, default_value_t, value_enum)]
display_format: DisplayFormat,
#[arg(long, default_value = "false")]
emulate_object_store: bool,
}

fn main() -> ExitCode {
Expand Down Expand Up @@ -62,6 +64,7 @@ fn main() -> ExitCode {
args.warmup,
args.only_vortex,
args.display_format,
args.emulate_object_store,
))
}

Expand All @@ -72,6 +75,7 @@ async fn bench_main(
warmup: bool,
only_vortex: bool,
display_format: DisplayFormat,
emulate_object_store: bool,
) -> ExitCode {
// uncomment the below to enable trace logging of datafusion execution
// setup_logger(LevelFilter::Trace);
Expand Down Expand Up @@ -101,7 +105,7 @@ async fn bench_main(
let ctxs = try_join_all(
formats
.iter()
.map(|format| load_datasets(&data_dir, *format)),
.map(|format| load_datasets(&data_dir, *format, emulate_object_store)),
)
.await
.unwrap();
Expand Down
138 changes: 138 additions & 0 deletions bench-vortex/src/blob.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use datafusion::execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry};
use futures::stream::BoxStream;
use governor::{DefaultDirectRateLimiter, Quota};
use object_store::path::Path;
use object_store::{
GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts,
PutOptions, PutPayload, PutResult, Result as OSResult,
};
use rand::prelude::Distribution as _;
use rand::thread_rng;
use rand_distr::LogNormal;
use reqwest::Url;

#[derive(Debug)]
pub struct SlowObjectStore {
inner: Arc<dyn ObjectStore>,
distribution: LogNormal<f32>,
rate_limiter: Arc<DefaultDirectRateLimiter>,
}

#[derive(Debug)]
pub struct SlowObjectStoreRegistry {
pub inner: Arc<dyn ObjectStoreRegistry>,
}

impl Default for SlowObjectStoreRegistry {
fn default() -> Self {
Self {
inner: Arc::new(DefaultObjectStoreRegistry::default()) as _,
}
}
}

impl ObjectStoreRegistry for SlowObjectStoreRegistry {
fn register_store(
&self,
url: &Url,
store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
self.inner.register_store(url, store)
}

fn get_store(&self, url: &Url) -> datafusion_common::Result<Arc<dyn ObjectStore>> {
let store = self.inner.get_store(url)?;
Ok(Arc::new(SlowObjectStore::new(store)))
}
}

impl SlowObjectStore {
pub fn new(object_store: Arc<dyn ObjectStore>) -> Self {
Self {
inner: object_store,
distribution: LogNormal::new(4.7, 0.5).unwrap(), //p50 ~ 100, p95 ~ 250 and p100 ~ 600
rate_limiter: Arc::new(DefaultDirectRateLimiter::direct(Quota::per_second(
(2_u32 << 30).try_into().unwrap(), // 1GB/s
))),
}
}

/// Injects an artificial sleep of somewhere between 30ms to a full second.
// wait times will p50 ~ 100ms, p95 ~ 250ms and p100 ~ 600ms.
///
/// We always wait at least 30ms, which seems to be the rough baseline for object store access.
async fn wait(&self) {
let duration = self.distribution.sample(&mut thread_rng()) as u64;
tokio::time::sleep(Duration::from_millis(duration.clamp(30, 1_000))).await;
}
}

impl std::fmt::Display for SlowObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "SlowObjectStore({})", self.inner)
}
}

#[async_trait]
impl ObjectStore for SlowObjectStore {
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> OSResult<PutResult> {
self.wait().await;
self.inner.put_opts(location, payload, opts).await
}

async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOpts,
) -> OSResult<Box<dyn MultipartUpload>> {
self.wait().await;
self.inner.put_multipart_opts(location, opts).await
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
// Ideally, we would tune `wait` here for the actual if it exists in options.range
self.wait().await;
let r = self.inner.get_opts(location, options).await?;
let size = r.meta.size as u32;
self.rate_limiter
.until_n_ready(size.try_into().unwrap())
.await
.unwrap();

Ok(r)
}

async fn delete(&self, location: &Path) -> OSResult<()> {
self.wait().await;
self.inner.delete(location).await
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, OSResult<ObjectMeta>> {
self.inner.list(prefix)
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
// This just makes listing super slow and its not really the part we're interested in
// self.wait().await;
self.inner.list_with_delimiter(prefix).await
}

async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> {
self.wait().await;
self.inner.copy(from, to).await
}

async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
self.wait().await;
self.copy_if_not_exists(from, to).await
}
}
Loading
Loading