Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ members = ["benchmarks"]

[workspace.dependencies]
datafusion = { version = "49.0.0" }
datafusion-proto = { version = "49.0.0" }

[package]
name = "datafusion-distributed"
Expand All @@ -11,7 +12,7 @@ edition = "2021"

[dependencies]
datafusion = { workspace = true }
datafusion-proto = { version = "49.0.0" }
datafusion-proto = { workspace = true }
arrow-flight = "55.2.0"
async-trait = "0.1.88"
tokio = { version = "1.46.1", features = ["full"] }
Expand Down
10 changes: 4 additions & 6 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ default-run = "dfbench"

[dependencies]
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
datafusion-distributed = { path = "..", features = ["integration"] }
tokio = { version = "1.46.1", features = ["full"] }
parquet = { version = "55.2.0" }
Expand All @@ -15,14 +16,11 @@ serde = "1.0.219"
serde_json = "1.0.141"
env_logger = "0.11.8"
async-trait = "0.1.88"
datafusion-proto = { version = "49.0.0", optional = true }
chrono = "0.4.41"
futures = "0.3.31"
dashmap = "6.1.0"
prost = "0.13.5"

[[bin]]
name = "dfbench"
path = "src/bin/dfbench.rs"

[features]
ci = [
"datafusion-proto"
]
51 changes: 26 additions & 25 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,21 @@ use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::{SessionState, SessionStateBuilder};
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;

use crate::util::{print_memory_stats, BenchmarkRun, CommonOpt, QueryResult};
use crate::util::{
BenchmarkRun, CommonOpt, InMemoryCacheExecCodec, InMemoryDataSourceRule, QueryResult,
WarmingUpMarker,
};
use datafusion_distributed::test_utils::localhost::start_localhost_context;
use datafusion_distributed::{
DistributedPhysicalOptimizerRule, DistributedSessionBuilder, DistributedSessionBuilderContext,
DistributedExt, DistributedPhysicalOptimizerRule, DistributedSessionBuilder,
DistributedSessionBuilderContext,
};
use log::info;
use structopt::StructOpt;
Expand Down Expand Up @@ -115,18 +119,23 @@ pub struct RunOpt {
impl DistributedSessionBuilder for RunOpt {
async fn build_session_state(
&self,
_ctx: DistributedSessionBuilderContext,
ctx: DistributedSessionBuilderContext,
) -> Result<SessionState, DataFusionError> {
let mut builder = SessionStateBuilder::new().with_default_features();

let config = self
.common
.config()?
.with_collect_statistics(!self.disable_statistics)
.with_distributed_user_codec(InMemoryCacheExecCodec)
.with_distributed_option_extension_from_headers::<WarmingUpMarker>(&ctx.headers)?
.with_target_partitions(self.partitions());

let rt_builder = self.common.runtime_env_builder()?;

if self.mem_table {
builder = builder.with_physical_optimizer_rule(Arc::new(InMemoryDataSourceRule));
}
if self.distributed {
let mut rule = DistributedPhysicalOptimizerRule::new();
if let Some(partitions_per_task) = self.partitions_per_task {
Expand Down Expand Up @@ -191,8 +200,18 @@ impl RunOpt {

let sql = &get_query_sql(query_id)?;

let single_node_ctx = SessionContext::new();
self.register_tables(&single_node_ctx).await?;
// Warmup the cache for the in-memory mode.
if self.mem_table {
// put the WarmingUpMarker in the context, otherwise, queries will fail as the
// InMemoryCacheExec node will think they should already be warmed up.
let ctx = ctx
.clone()
.with_distributed_option_extension(WarmingUpMarker::warming_up())?;
for query in sql.iter() {
self.execute_query(&ctx, query).await?;
}
println!("Query {query_id} data loaded in memory");
}

for i in 0..self.iterations() {
let start = Instant::now();
Expand Down Expand Up @@ -225,30 +244,12 @@ impl RunOpt {
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
println!("Query {query_id} avg time: {avg:.2} ms");

// Print memory stats using mimalloc (only when compiled with --features mimalloc_extended)
print_memory_stats();

Ok(query_results)
}

async fn register_tables(&self, ctx: &SessionContext) -> Result<()> {
for table in TPCH_TABLES {
let table_provider = { self.get_table(ctx, table).await? };

if self.mem_table {
println!("Loading table '{table}' into memory");
let start = Instant::now();
let memtable =
MemTable::load(table_provider, Some(self.partitions()), &ctx.state()).await?;
println!(
"Loaded table '{}' into memory in {} ms",
table,
start.elapsed().as_millis()
);
ctx.register_table(*table, Arc::new(memtable))?;
} else {
ctx.register_table(*table, table_provider)?;
}
ctx.register_table(*table, self.get_table(ctx, table).await?)?;
}
Ok(())
}
Expand Down
234 changes: 214 additions & 20 deletions benchmarks/src/util/memory.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,215 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

/// Print Peak RSS, Peak Commit, Page Faults based on mimalloc api
pub fn print_memory_stats() {
// removed as not used in this project.
use dashmap::{DashMap, Entry};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::{exec_err, extensions_options, plan_err};
use datafusion::config::{ConfigExtension, ConfigOptions};
use datafusion::error::DataFusionError;
use datafusion::execution::{FunctionRegistry, SendableRecordBatchStream, TaskContext};
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
displayable, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
};
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use futures::{FutureExt, StreamExt};
use prost::Message;
use std::any::Any;
use std::fmt::Formatter;
use std::sync::{Arc, LazyLock};
use tokio::sync::OnceCell;

type Key = (String, usize);
type Value = Arc<OnceCell<Vec<RecordBatch>>>;
static CACHE: LazyLock<DashMap<Key, Value>> = LazyLock::new(DashMap::default);

/// Caches all the record batches in a global [CACHE] on the first run, and serves
/// them from the cache in any subsequent run.
#[derive(Debug, Clone)]
pub struct InMemoryCacheExec {
inner: Arc<dyn ExecutionPlan>,
}

extensions_options! {
/// Marker used by the [InMemoryCacheExec] that determines wether its fine
/// to load data from disk because we are warming up, or not.
///
/// If this marker is not present during InMemoryCacheExec::execute(), and
/// the data was not loaded in-memory already, the query will fail.
pub struct WarmingUpMarker {
is_warming_up: bool, default = false
}
}

impl ConfigExtension for WarmingUpMarker {
const PREFIX: &'static str = "in-memory-cache-exec";
}

impl WarmingUpMarker {
pub fn warming_up() -> Self {
Self {
is_warming_up: true,
}
}
}

impl ExecutionPlan for InMemoryCacheExec {
fn name(&self) -> &str {
"InMemoryDataSourceExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &PlanProperties {
self.inner.properties()
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.inner]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self {
inner: children[0].clone(),
}))
}

fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> datafusion::common::Result<SendableRecordBatchStream> {
let once = {
let inner_display = displayable(self.inner.as_ref()).one_line().to_string();
let entry = CACHE.entry((inner_display, partition));
if matches!(entry, Entry::Vacant(_))
&& !context
.session_config()
.options()
.extensions
.get::<WarmingUpMarker>()
.map(|v| v.is_warming_up)
.unwrap_or_default()
{
return exec_err!("InMemoryCacheExec is not yet warmed up");
}
let once = entry.or_insert(Arc::new(OnceCell::new()));
once.value().clone()
};

let inner = Arc::clone(&self.inner);

let stream = async move {
let batches = once
.get_or_try_init(|| async move {
let mut stream = inner.execute(partition, context)?;
let mut batches = vec![];
while let Some(batch) = stream.next().await {
batches.push(batch?);
}
Ok::<_, DataFusionError>(batches)
})
.await?;
Ok(batches.clone())
}
.into_stream()
.map(|v| match v {
Ok(batch) => futures::stream::iter(batch.into_iter().map(Ok)).boxed(),
Err(err) => futures::stream::once(async { Err(err) }).boxed(),
})
.flatten();

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.inner.schema(),
stream,
)))
}
}

impl DisplayAs for InMemoryCacheExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
writeln!(f, "InMemoryDataSourceExec")
}
}

#[derive(Clone, PartialEq, ::prost::Message)]
struct InMemoryCacheExecProto {
#[prost(string, tag = "1")]
name: String,
}

#[derive(Debug)]
pub struct InMemoryCacheExecCodec;

impl PhysicalExtensionCodec for InMemoryCacheExecCodec {
fn try_decode(
&self,
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
_registry: &dyn FunctionRegistry,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
let Ok(proto) = InMemoryCacheExecProto::decode(buf) else {
return plan_err!("no InMemoryDataSourceExecProto");
};
if proto.name != "InMemoryDataSourceExec" {
return plan_err!("unsupported InMemoryDataSourceExec proto: {:?}", proto.name);
};
Ok(Arc::new(InMemoryCacheExec {
inner: inputs[0].clone(),
}))
}

fn try_encode(
&self,
node: Arc<dyn ExecutionPlan>,
buf: &mut Vec<u8>,
) -> datafusion::common::Result<()> {
if !node.as_any().is::<InMemoryCacheExec>() {
return plan_err!("no InMemoryDataSourceExec");
};
let proto = InMemoryCacheExecProto {
name: "InMemoryDataSourceExec".to_string(),
};
let Ok(_) = proto.encode(buf) else {
return plan_err!("no InMemoryDataSourceExecProto");
};

Ok(())
}
}

/// Wraps any plan without children with an [InMemoryCacheExec] node.
#[derive(Debug)]
pub struct InMemoryDataSourceRule;

impl PhysicalOptimizerRule for InMemoryDataSourceRule {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
Ok(plan
.transform_up(|plan| {
if plan.children().is_empty() {
Ok(Transformed::yes(Arc::new(InMemoryCacheExec {
inner: plan.clone(),
})))
} else {
Ok(Transformed::no(plan))
}
})?
.data)
}

fn name(&self) -> &str {
"InMemoryDataSourceRule"
}

fn schema_check(&self) -> bool {
true
}
}
Loading