Skip to content

Commit b23c0c7

Browse files
committed
Add support for in-memory TPCH tests
1 parent bad700d commit b23c0c7

File tree

6 files changed

+250
-53
lines changed

6 files changed

+250
-53
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ members = ["benchmarks"]
33

44
[workspace.dependencies]
55
datafusion = { version = "49.0.0" }
6+
datafusion-proto = { version = "49.0.0" }
67

78
[package]
89
name = "datafusion-distributed"
@@ -11,7 +12,7 @@ edition = "2021"
1112

1213
[dependencies]
1314
datafusion = { workspace = true }
14-
datafusion-proto = { version = "49.0.0" }
15+
datafusion-proto = { workspace = true }
1516
arrow-flight = "55.2.0"
1617
async-trait = "0.1.88"
1718
tokio = { version = "1.46.1", features = ["full"] }

benchmarks/Cargo.toml

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ default-run = "dfbench"
66

77
[dependencies]
88
datafusion = { workspace = true }
9+
datafusion-proto = { workspace = true }
910
datafusion-distributed = { path = "..", features = ["integration"] }
1011
tokio = { version = "1.46.1", features = ["full"] }
1112
parquet = { version = "55.2.0" }
@@ -15,14 +16,11 @@ serde = "1.0.219"
1516
serde_json = "1.0.141"
1617
env_logger = "0.11.8"
1718
async-trait = "0.1.88"
18-
datafusion-proto = { version = "49.0.0", optional = true }
1919
chrono = "0.4.41"
20+
futures = "0.3.31"
21+
dashmap = "6.1.0"
22+
prost = "0.13.5"
2023

2124
[[bin]]
2225
name = "dfbench"
2326
path = "src/bin/dfbench.rs"
24-
25-
[features]
26-
ci = [
27-
"datafusion-proto"
28-
]

benchmarks/src/tpch/run.rs

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,21 @@ use datafusion::datasource::file_format::FileFormat;
3434
use datafusion::datasource::listing::{
3535
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
3636
};
37-
use datafusion::datasource::{MemTable, TableProvider};
37+
use datafusion::datasource::TableProvider;
3838
use datafusion::error::{DataFusionError, Result};
3939
use datafusion::execution::{SessionState, SessionStateBuilder};
4040
use datafusion::physical_plan::display::DisplayableExecutionPlan;
4141
use datafusion::physical_plan::{collect, displayable};
4242
use datafusion::prelude::*;
4343

44-
use crate::util::{print_memory_stats, BenchmarkRun, CommonOpt, QueryResult};
44+
use crate::util::{
45+
BenchmarkRun, CommonOpt, InMemoryCacheExecCodec, InMemoryDataSourceRule, QueryResult,
46+
WarmingUpMarker,
47+
};
4548
use datafusion_distributed::test_utils::localhost::start_localhost_context;
4649
use datafusion_distributed::{
47-
DistributedPhysicalOptimizerRule, DistributedSessionBuilder, DistributedSessionBuilderContext,
50+
DistributedExt, DistributedPhysicalOptimizerRule, DistributedSessionBuilder,
51+
DistributedSessionBuilderContext,
4852
};
4953
use log::info;
5054
use structopt::StructOpt;
@@ -115,18 +119,23 @@ pub struct RunOpt {
115119
impl DistributedSessionBuilder for RunOpt {
116120
async fn build_session_state(
117121
&self,
118-
_ctx: DistributedSessionBuilderContext,
122+
ctx: DistributedSessionBuilderContext,
119123
) -> Result<SessionState, DataFusionError> {
120124
let mut builder = SessionStateBuilder::new().with_default_features();
121125

122126
let config = self
123127
.common
124128
.config()?
125129
.with_collect_statistics(!self.disable_statistics)
130+
.with_distributed_user_codec(InMemoryCacheExecCodec)
131+
.with_distributed_option_extension_from_headers::<WarmingUpMarker>(&ctx.headers)?
126132
.with_target_partitions(self.partitions());
127133

128134
let rt_builder = self.common.runtime_env_builder()?;
129135

136+
if self.mem_table {
137+
builder = builder.with_physical_optimizer_rule(Arc::new(InMemoryDataSourceRule));
138+
}
130139
if self.distributed {
131140
let mut rule = DistributedPhysicalOptimizerRule::new();
132141
if let Some(partitions_per_task) = self.partitions_per_task {
@@ -191,8 +200,18 @@ impl RunOpt {
191200

192201
let sql = &get_query_sql(query_id)?;
193202

194-
let single_node_ctx = SessionContext::new();
195-
self.register_tables(&single_node_ctx).await?;
203+
// Warmup the cache for the in-memory mode.
204+
if self.mem_table {
205+
// put the WarmingUpMarker in the context, otherwise, queries will fail as the
206+
// InMemoryCacheExec node will think they should already be warmed up.
207+
let ctx = ctx
208+
.clone()
209+
.with_distributed_option_extension(WarmingUpMarker::warming_up())?;
210+
for query in sql.iter() {
211+
self.execute_query(&ctx, query).await?;
212+
}
213+
println!("Query {query_id} data loaded in memory");
214+
}
196215

197216
for i in 0..self.iterations() {
198217
let start = Instant::now();
@@ -225,30 +244,12 @@ impl RunOpt {
225244
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
226245
println!("Query {query_id} avg time: {avg:.2} ms");
227246

228-
// Print memory stats using mimalloc (only when compiled with --features mimalloc_extended)
229-
print_memory_stats();
230-
231247
Ok(query_results)
232248
}
233249

234250
async fn register_tables(&self, ctx: &SessionContext) -> Result<()> {
235251
for table in TPCH_TABLES {
236-
let table_provider = { self.get_table(ctx, table).await? };
237-
238-
if self.mem_table {
239-
println!("Loading table '{table}' into memory");
240-
let start = Instant::now();
241-
let memtable =
242-
MemTable::load(table_provider, Some(self.partitions()), &ctx.state()).await?;
243-
println!(
244-
"Loaded table '{}' into memory in {} ms",
245-
table,
246-
start.elapsed().as_millis()
247-
);
248-
ctx.register_table(*table, Arc::new(memtable))?;
249-
} else {
250-
ctx.register_table(*table, table_provider)?;
251-
}
252+
ctx.register_table(*table, self.get_table(ctx, table).await?)?;
252253
}
253254
Ok(())
254255
}

benchmarks/src/util/memory.rs

Lines changed: 214 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,215 @@
1-
// Licensed to the Apache Software Foundation (ASF) under one
2-
// or more contributor license agreements. See the NOTICE file
3-
// distributed with this work for additional information
4-
// regarding copyright ownership. The ASF licenses this file
5-
// to you under the Apache License, Version 2.0 (the
6-
// "License"); you may not use this file except in compliance
7-
// with the License. You may obtain a copy of the License at
8-
//
9-
// http://www.apache.org/licenses/LICENSE-2.0
10-
//
11-
// Unless required by applicable law or agreed to in writing,
12-
// software distributed under the License is distributed on an
13-
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14-
// KIND, either express or implied. See the License for the
15-
// specific language governing permissions and limitations
16-
// under the License.
17-
18-
/// Print Peak RSS, Peak Commit, Page Faults based on mimalloc api
19-
pub fn print_memory_stats() {
20-
// removed as not used in this project.
1+
use dashmap::{DashMap, Entry};
2+
use datafusion::arrow::record_batch::RecordBatch;
3+
use datafusion::common::tree_node::{Transformed, TreeNode};
4+
use datafusion::common::{exec_err, extensions_options, plan_err};
5+
use datafusion::config::{ConfigExtension, ConfigOptions};
6+
use datafusion::error::DataFusionError;
7+
use datafusion::execution::{FunctionRegistry, SendableRecordBatchStream, TaskContext};
8+
use datafusion::physical_optimizer::PhysicalOptimizerRule;
9+
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
10+
use datafusion::physical_plan::{
11+
displayable, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
12+
};
13+
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
14+
use futures::{FutureExt, StreamExt};
15+
use prost::Message;
16+
use std::any::Any;
17+
use std::fmt::Formatter;
18+
use std::sync::{Arc, LazyLock};
19+
use tokio::sync::OnceCell;
20+
21+
type Key = (String, usize);
22+
type Value = Arc<OnceCell<Vec<RecordBatch>>>;
23+
static CACHE: LazyLock<DashMap<Key, Value>> = LazyLock::new(DashMap::default);
24+
25+
/// Caches all the record batches in a global [CACHE] on the first run, and serves
26+
/// them from the cache in any subsequent run.
27+
#[derive(Debug, Clone)]
28+
pub struct InMemoryCacheExec {
29+
inner: Arc<dyn ExecutionPlan>,
30+
}
31+
32+
extensions_options! {
33+
/// Marker used by the [InMemoryCacheExec] that determines wether its fine
34+
/// to load data from disk because we are warming up, or not.
35+
///
36+
/// If this marker is not present during InMemoryCacheExec::execute(), and
37+
/// the data was not loaded in-memory already, the query will fail.
38+
pub struct WarmingUpMarker {
39+
is_warming_up: bool, default = false
40+
}
41+
}
42+
43+
impl ConfigExtension for WarmingUpMarker {
44+
const PREFIX: &'static str = "in-memory-cache-exec";
45+
}
46+
47+
impl WarmingUpMarker {
48+
pub fn warming_up() -> Self {
49+
Self {
50+
is_warming_up: true,
51+
}
52+
}
53+
}
54+
55+
impl ExecutionPlan for InMemoryCacheExec {
56+
fn name(&self) -> &str {
57+
"InMemoryDataSourceExec"
58+
}
59+
60+
fn as_any(&self) -> &dyn Any {
61+
self
62+
}
63+
64+
fn properties(&self) -> &PlanProperties {
65+
self.inner.properties()
66+
}
67+
68+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
69+
vec![&self.inner]
70+
}
71+
72+
fn with_new_children(
73+
self: Arc<Self>,
74+
children: Vec<Arc<dyn ExecutionPlan>>,
75+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
76+
Ok(Arc::new(Self {
77+
inner: children[0].clone(),
78+
}))
79+
}
80+
81+
fn execute(
82+
&self,
83+
partition: usize,
84+
context: Arc<TaskContext>,
85+
) -> datafusion::common::Result<SendableRecordBatchStream> {
86+
let once = {
87+
let inner_display = displayable(self.inner.as_ref()).one_line().to_string();
88+
let entry = CACHE.entry((inner_display, partition));
89+
if matches!(entry, Entry::Vacant(_))
90+
&& !context
91+
.session_config()
92+
.options()
93+
.extensions
94+
.get::<WarmingUpMarker>()
95+
.map(|v| v.is_warming_up)
96+
.unwrap_or_default()
97+
{
98+
return exec_err!("InMemoryCacheExec is not yet warmed up");
99+
}
100+
let once = entry.or_insert(Arc::new(OnceCell::new()));
101+
once.value().clone()
102+
};
103+
104+
let inner = Arc::clone(&self.inner);
105+
106+
let stream = async move {
107+
let batches = once
108+
.get_or_try_init(|| async move {
109+
let mut stream = inner.execute(partition, context)?;
110+
let mut batches = vec![];
111+
while let Some(batch) = stream.next().await {
112+
batches.push(batch?);
113+
}
114+
Ok::<_, DataFusionError>(batches)
115+
})
116+
.await?;
117+
Ok(batches.clone())
118+
}
119+
.into_stream()
120+
.map(|v| match v {
121+
Ok(batch) => futures::stream::iter(batch.into_iter().map(Ok)).boxed(),
122+
Err(err) => futures::stream::once(async { Err(err) }).boxed(),
123+
})
124+
.flatten();
125+
126+
Ok(Box::pin(RecordBatchStreamAdapter::new(
127+
self.inner.schema(),
128+
stream,
129+
)))
130+
}
131+
}
132+
133+
impl DisplayAs for InMemoryCacheExec {
134+
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
135+
writeln!(f, "InMemoryDataSourceExec")
136+
}
137+
}
138+
139+
#[derive(Clone, PartialEq, ::prost::Message)]
140+
struct InMemoryCacheExecProto {
141+
#[prost(string, tag = "1")]
142+
name: String,
143+
}
144+
145+
#[derive(Debug)]
146+
pub struct InMemoryCacheExecCodec;
147+
148+
impl PhysicalExtensionCodec for InMemoryCacheExecCodec {
149+
fn try_decode(
150+
&self,
151+
buf: &[u8],
152+
inputs: &[Arc<dyn ExecutionPlan>],
153+
_registry: &dyn FunctionRegistry,
154+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
155+
let Ok(proto) = InMemoryCacheExecProto::decode(buf) else {
156+
return plan_err!("no InMemoryDataSourceExecProto");
157+
};
158+
if proto.name != "InMemoryDataSourceExec" {
159+
return plan_err!("unsupported InMemoryDataSourceExec proto: {:?}", proto.name);
160+
};
161+
Ok(Arc::new(InMemoryCacheExec {
162+
inner: inputs[0].clone(),
163+
}))
164+
}
165+
166+
fn try_encode(
167+
&self,
168+
node: Arc<dyn ExecutionPlan>,
169+
buf: &mut Vec<u8>,
170+
) -> datafusion::common::Result<()> {
171+
if !node.as_any().is::<InMemoryCacheExec>() {
172+
return plan_err!("no InMemoryDataSourceExec");
173+
};
174+
let proto = InMemoryCacheExecProto {
175+
name: "InMemoryDataSourceExec".to_string(),
176+
};
177+
let Ok(_) = proto.encode(buf) else {
178+
return plan_err!("no InMemoryDataSourceExecProto");
179+
};
180+
181+
Ok(())
182+
}
183+
}
184+
185+
/// Wraps any plan without children with an [InMemoryCacheExec] node.
186+
#[derive(Debug)]
187+
pub struct InMemoryDataSourceRule;
188+
189+
impl PhysicalOptimizerRule for InMemoryDataSourceRule {
190+
fn optimize(
191+
&self,
192+
plan: Arc<dyn ExecutionPlan>,
193+
_config: &ConfigOptions,
194+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
195+
Ok(plan
196+
.transform_up(|plan| {
197+
if plan.children().is_empty() {
198+
Ok(Transformed::yes(Arc::new(InMemoryCacheExec {
199+
inner: plan.clone(),
200+
})))
201+
} else {
202+
Ok(Transformed::no(plan))
203+
}
204+
})?
205+
.data)
206+
}
207+
208+
fn name(&self) -> &str {
209+
"InMemoryDataSourceRule"
210+
}
211+
212+
fn schema_check(&self) -> bool {
213+
true
214+
}
21215
}

0 commit comments

Comments
 (0)