From b23c0c7de08a767643e278a4b81d401946f7ba72 Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Sun, 7 Sep 2025 08:42:03 +0200 Subject: [PATCH 1/3] Add support for in-memory TPCH tests --- Cargo.lock | 3 + Cargo.toml | 3 +- benchmarks/Cargo.toml | 10 +- benchmarks/src/tpch/run.rs | 51 ++++---- benchmarks/src/util/memory.rs | 234 +++++++++++++++++++++++++++++++--- benchmarks/src/util/mod.rs | 2 +- 6 files changed, 250 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0504aa7..78c84c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1147,12 +1147,15 @@ version = "0.1.0" dependencies = [ "async-trait", "chrono", + "dashmap", "datafusion", "datafusion-distributed", "datafusion-proto", "env_logger", + "futures", "log", "parquet", + "prost", "serde", "serde_json", "structopt", diff --git a/Cargo.toml b/Cargo.toml index f1e7cd0..174a05a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = ["benchmarks"] [workspace.dependencies] datafusion = { version = "49.0.0" } +datafusion-proto = { version = "49.0.0" } [package] name = "datafusion-distributed" @@ -11,7 +12,7 @@ edition = "2021" [dependencies] datafusion = { workspace = true } -datafusion-proto = { version = "49.0.0" } +datafusion-proto = { workspace = true } arrow-flight = "55.2.0" async-trait = "0.1.88" tokio = { version = "1.46.1", features = ["full"] } diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 41babb4..8cab583 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -6,6 +6,7 @@ default-run = "dfbench" [dependencies] datafusion = { workspace = true } +datafusion-proto = { workspace = true } datafusion-distributed = { path = "..", features = ["integration"] } tokio = { version = "1.46.1", features = ["full"] } parquet = { version = "55.2.0" } @@ -15,14 +16,11 @@ serde = "1.0.219" serde_json = "1.0.141" env_logger = "0.11.8" async-trait = "0.1.88" -datafusion-proto = { version = "49.0.0", optional = true } chrono = "0.4.41" +futures = "0.3.31" +dashmap = "6.1.0" +prost = "0.13.5" [[bin]] name = "dfbench" path = "src/bin/dfbench.rs" - -[features] -ci = [ - "datafusion-proto" -] \ No newline at end of file diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index b2ac282..310cddf 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -34,17 +34,21 @@ use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; -use datafusion::datasource::{MemTable, TableProvider}; +use datafusion::datasource::TableProvider; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::{SessionState, SessionStateBuilder}; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::{collect, displayable}; use datafusion::prelude::*; -use crate::util::{print_memory_stats, BenchmarkRun, CommonOpt, QueryResult}; +use crate::util::{ + BenchmarkRun, CommonOpt, InMemoryCacheExecCodec, InMemoryDataSourceRule, QueryResult, + WarmingUpMarker, +}; use datafusion_distributed::test_utils::localhost::start_localhost_context; use datafusion_distributed::{ - DistributedPhysicalOptimizerRule, DistributedSessionBuilder, DistributedSessionBuilderContext, + DistributedExt, DistributedPhysicalOptimizerRule, DistributedSessionBuilder, + DistributedSessionBuilderContext, }; use log::info; use structopt::StructOpt; @@ -115,7 +119,7 @@ pub struct RunOpt { impl DistributedSessionBuilder for RunOpt { async fn build_session_state( &self, - _ctx: DistributedSessionBuilderContext, + ctx: DistributedSessionBuilderContext, ) -> Result { let mut builder = SessionStateBuilder::new().with_default_features(); @@ -123,10 +127,15 @@ impl DistributedSessionBuilder for RunOpt { .common .config()? .with_collect_statistics(!self.disable_statistics) + .with_distributed_user_codec(InMemoryCacheExecCodec) + .with_distributed_option_extension_from_headers::(&ctx.headers)? .with_target_partitions(self.partitions()); let rt_builder = self.common.runtime_env_builder()?; + if self.mem_table { + builder = builder.with_physical_optimizer_rule(Arc::new(InMemoryDataSourceRule)); + } if self.distributed { let mut rule = DistributedPhysicalOptimizerRule::new(); if let Some(partitions_per_task) = self.partitions_per_task { @@ -191,8 +200,18 @@ impl RunOpt { let sql = &get_query_sql(query_id)?; - let single_node_ctx = SessionContext::new(); - self.register_tables(&single_node_ctx).await?; + // Warmup the cache for the in-memory mode. + if self.mem_table { + // put the WarmingUpMarker in the context, otherwise, queries will fail as the + // InMemoryCacheExec node will think they should already be warmed up. + let ctx = ctx + .clone() + .with_distributed_option_extension(WarmingUpMarker::warming_up())?; + for query in sql.iter() { + self.execute_query(&ctx, query).await?; + } + println!("Query {query_id} data loaded in memory"); + } for i in 0..self.iterations() { let start = Instant::now(); @@ -225,30 +244,12 @@ impl RunOpt { let avg = millis.iter().sum::() / millis.len() as f64; println!("Query {query_id} avg time: {avg:.2} ms"); - // Print memory stats using mimalloc (only when compiled with --features mimalloc_extended) - print_memory_stats(); - Ok(query_results) } async fn register_tables(&self, ctx: &SessionContext) -> Result<()> { for table in TPCH_TABLES { - let table_provider = { self.get_table(ctx, table).await? }; - - if self.mem_table { - println!("Loading table '{table}' into memory"); - let start = Instant::now(); - let memtable = - MemTable::load(table_provider, Some(self.partitions()), &ctx.state()).await?; - println!( - "Loaded table '{}' into memory in {} ms", - table, - start.elapsed().as_millis() - ); - ctx.register_table(*table, Arc::new(memtable))?; - } else { - ctx.register_table(*table, table_provider)?; - } + ctx.register_table(*table, self.get_table(ctx, table).await?)?; } Ok(()) } diff --git a/benchmarks/src/util/memory.rs b/benchmarks/src/util/memory.rs index 2eb7ea5..8d7edf8 100644 --- a/benchmarks/src/util/memory.rs +++ b/benchmarks/src/util/memory.rs @@ -1,21 +1,215 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -/// Print Peak RSS, Peak Commit, Page Faults based on mimalloc api -pub fn print_memory_stats() { - // removed as not used in this project. +use dashmap::{DashMap, Entry}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::common::tree_node::{Transformed, TreeNode}; +use datafusion::common::{exec_err, extensions_options, plan_err}; +use datafusion::config::{ConfigExtension, ConfigOptions}; +use datafusion::error::DataFusionError; +use datafusion::execution::{FunctionRegistry, SendableRecordBatchStream, TaskContext}; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ + displayable, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, +}; +use datafusion_proto::physical_plan::PhysicalExtensionCodec; +use futures::{FutureExt, StreamExt}; +use prost::Message; +use std::any::Any; +use std::fmt::Formatter; +use std::sync::{Arc, LazyLock}; +use tokio::sync::OnceCell; + +type Key = (String, usize); +type Value = Arc>>; +static CACHE: LazyLock> = LazyLock::new(DashMap::default); + +/// Caches all the record batches in a global [CACHE] on the first run, and serves +/// them from the cache in any subsequent run. +#[derive(Debug, Clone)] +pub struct InMemoryCacheExec { + inner: Arc, +} + +extensions_options! { + /// Marker used by the [InMemoryCacheExec] that determines wether its fine + /// to load data from disk because we are warming up, or not. + /// + /// If this marker is not present during InMemoryCacheExec::execute(), and + /// the data was not loaded in-memory already, the query will fail. + pub struct WarmingUpMarker { + is_warming_up: bool, default = false + } +} + +impl ConfigExtension for WarmingUpMarker { + const PREFIX: &'static str = "in-memory-cache-exec"; +} + +impl WarmingUpMarker { + pub fn warming_up() -> Self { + Self { + is_warming_up: true, + } + } +} + +impl ExecutionPlan for InMemoryCacheExec { + fn name(&self) -> &str { + "InMemoryDataSourceExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + self.inner.properties() + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.inner] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> datafusion::common::Result> { + Ok(Arc::new(Self { + inner: children[0].clone(), + })) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> datafusion::common::Result { + let once = { + let inner_display = displayable(self.inner.as_ref()).one_line().to_string(); + let entry = CACHE.entry((inner_display, partition)); + if matches!(entry, Entry::Vacant(_)) + && !context + .session_config() + .options() + .extensions + .get::() + .map(|v| v.is_warming_up) + .unwrap_or_default() + { + return exec_err!("InMemoryCacheExec is not yet warmed up"); + } + let once = entry.or_insert(Arc::new(OnceCell::new())); + once.value().clone() + }; + + let inner = Arc::clone(&self.inner); + + let stream = async move { + let batches = once + .get_or_try_init(|| async move { + let mut stream = inner.execute(partition, context)?; + let mut batches = vec![]; + while let Some(batch) = stream.next().await { + batches.push(batch?); + } + Ok::<_, DataFusionError>(batches) + }) + .await?; + Ok(batches.clone()) + } + .into_stream() + .map(|v| match v { + Ok(batch) => futures::stream::iter(batch.into_iter().map(Ok)).boxed(), + Err(err) => futures::stream::once(async { Err(err) }).boxed(), + }) + .flatten(); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.inner.schema(), + stream, + ))) + } +} + +impl DisplayAs for InMemoryCacheExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + writeln!(f, "InMemoryDataSourceExec") + } +} + +#[derive(Clone, PartialEq, ::prost::Message)] +struct InMemoryCacheExecProto { + #[prost(string, tag = "1")] + name: String, +} + +#[derive(Debug)] +pub struct InMemoryCacheExecCodec; + +impl PhysicalExtensionCodec for InMemoryCacheExecCodec { + fn try_decode( + &self, + buf: &[u8], + inputs: &[Arc], + _registry: &dyn FunctionRegistry, + ) -> datafusion::common::Result> { + let Ok(proto) = InMemoryCacheExecProto::decode(buf) else { + return plan_err!("no InMemoryDataSourceExecProto"); + }; + if proto.name != "InMemoryDataSourceExec" { + return plan_err!("unsupported InMemoryDataSourceExec proto: {:?}", proto.name); + }; + Ok(Arc::new(InMemoryCacheExec { + inner: inputs[0].clone(), + })) + } + + fn try_encode( + &self, + node: Arc, + buf: &mut Vec, + ) -> datafusion::common::Result<()> { + if !node.as_any().is::() { + return plan_err!("no InMemoryDataSourceExec"); + }; + let proto = InMemoryCacheExecProto { + name: "InMemoryDataSourceExec".to_string(), + }; + let Ok(_) = proto.encode(buf) else { + return plan_err!("no InMemoryDataSourceExecProto"); + }; + + Ok(()) + } +} + +/// Wraps any plan without children with an [InMemoryCacheExec] node. +#[derive(Debug)] +pub struct InMemoryDataSourceRule; + +impl PhysicalOptimizerRule for InMemoryDataSourceRule { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> datafusion::common::Result> { + Ok(plan + .transform_up(|plan| { + if plan.children().is_empty() { + Ok(Transformed::yes(Arc::new(InMemoryCacheExec { + inner: plan.clone(), + }))) + } else { + Ok(Transformed::no(plan)) + } + })? + .data) + } + + fn name(&self) -> &str { + "InMemoryDataSourceRule" + } + + fn schema_check(&self) -> bool { + true + } } diff --git a/benchmarks/src/util/mod.rs b/benchmarks/src/util/mod.rs index a38d37d..f8041cf 100644 --- a/benchmarks/src/util/mod.rs +++ b/benchmarks/src/util/mod.rs @@ -20,6 +20,6 @@ mod memory; mod options; mod run; -pub use memory::print_memory_stats; +pub use memory::{InMemoryCacheExecCodec, InMemoryDataSourceRule, WarmingUpMarker}; pub use options::CommonOpt; pub use run::{BenchmarkRun, QueryResult}; From 6301891418c312f27a6c16e585f1cdd5a26f8550 Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Wed, 10 Sep 2025 10:37:57 +0200 Subject: [PATCH 2/3] Fix typo --- benchmarks/src/util/memory.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/src/util/memory.rs b/benchmarks/src/util/memory.rs index 8d7edf8..aa19e5e 100644 --- a/benchmarks/src/util/memory.rs +++ b/benchmarks/src/util/memory.rs @@ -30,7 +30,7 @@ pub struct InMemoryCacheExec { } extensions_options! { - /// Marker used by the [InMemoryCacheExec] that determines wether its fine + /// Marker used by the [InMemoryCacheExec] that determines whether it's fine /// to load data from disk because we are warming up, or not. /// /// If this marker is not present during InMemoryCacheExec::execute(), and From 1e7c6e5918efa332510d286b1e001c148e5eb6dc Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Wed, 10 Sep 2025 10:45:07 +0200 Subject: [PATCH 3/3] Improve comment --- benchmarks/src/util/memory.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/benchmarks/src/util/memory.rs b/benchmarks/src/util/memory.rs index aa19e5e..f268dfb 100644 --- a/benchmarks/src/util/memory.rs +++ b/benchmarks/src/util/memory.rs @@ -24,6 +24,16 @@ static CACHE: LazyLock> = LazyLock::new(DashMap::default); /// Caches all the record batches in a global [CACHE] on the first run, and serves /// them from the cache in any subsequent run. +/// The order of events looks like this: +/// 1. A first query is run. +/// 2. Data is not cached, so it is gathered from the underlying node. +/// 3. The cache is populated with the recently gathered data. +/// 4. A second query is run. +/// 5. The cache is hit, and data is returned from there. +/// +/// The cache key includes the result of "explaining" the underlying node, so different +/// nodes applying different filters under the same parquet files will be cached +/// independently. #[derive(Debug, Clone)] pub struct InMemoryCacheExec { inner: Arc,