Skip to content

Commit 2c3566c

Browse files
authored
doc: add example for cache factory (apache#19139)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#18893. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 65a6bc4 commit 2c3566c

File tree

2 files changed

+238
-0
lines changed

2 files changed

+238
-0
lines changed
Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! See `main.rs` for how to run it.
19+
20+
use std::fmt::Debug;
21+
use std::hash::Hash;
22+
use std::sync::Arc;
23+
use std::sync::RwLock;
24+
25+
use arrow::array::RecordBatch;
26+
use async_trait::async_trait;
27+
use datafusion::catalog::memory::MemorySourceConfig;
28+
use datafusion::common::DFSchemaRef;
29+
use datafusion::error::Result;
30+
use datafusion::execution::SessionState;
31+
use datafusion::execution::SessionStateBuilder;
32+
use datafusion::execution::context::QueryPlanner;
33+
use datafusion::execution::session_state::CacheFactory;
34+
use datafusion::logical_expr::Extension;
35+
use datafusion::logical_expr::LogicalPlan;
36+
use datafusion::logical_expr::UserDefinedLogicalNode;
37+
use datafusion::logical_expr::UserDefinedLogicalNodeCore;
38+
use datafusion::physical_plan::ExecutionPlan;
39+
use datafusion::physical_plan::collect_partitioned;
40+
use datafusion::physical_planner::DefaultPhysicalPlanner;
41+
use datafusion::physical_planner::ExtensionPlanner;
42+
use datafusion::physical_planner::PhysicalPlanner;
43+
use datafusion::prelude::ParquetReadOptions;
44+
use datafusion::prelude::SessionContext;
45+
use datafusion::prelude::*;
46+
use datafusion_common::HashMap;
47+
48+
/// This example demonstrates how to leverage [CacheFactory] to implement custom caching strategies for dataframes in DataFusion.
49+
/// By default, [DataFrame::cache] in Datafusion is eager and creates an in-memory table. This example shows a basic alternative implementation for lazy caching.
50+
/// Specifically, it implements:
51+
/// - A [CustomCacheFactory] that creates a logical node [CacheNode] representing the cache operation.
52+
/// - A [CacheNodePlanner] (an [ExtensionPlanner]) that understands [CacheNode] and performs caching.
53+
/// - A [CacheNodeQueryPlanner] that installs [CacheNodePlanner].
54+
/// - A simple in-memory [CacheManager] that stores cached [RecordBatch]es. Note that the implementation for this example is very naive and only implements put, but for real production use cases cache eviction and drop should also be implemented.
55+
pub async fn cache_dataframe_with_custom_logic() -> Result<()> {
56+
let testdata = datafusion::test_util::parquet_test_data();
57+
let filename = &format!("{testdata}/alltypes_plain.parquet");
58+
59+
let session_state = SessionStateBuilder::new()
60+
.with_cache_factory(Some(Arc::new(CustomCacheFactory {})))
61+
.with_query_planner(Arc::new(CacheNodeQueryPlanner::default()))
62+
.build();
63+
let ctx = SessionContext::new_with_state(session_state);
64+
65+
// Read the parquet files and show its schema using 'describe'
66+
let parquet_df = ctx
67+
.read_parquet(filename, ParquetReadOptions::default())
68+
.await?;
69+
70+
let df_cached = parquet_df
71+
.select_columns(&["id", "bool_col", "timestamp_col"])?
72+
.filter(col("id").gt(lit(1)))?
73+
.cache()
74+
.await?;
75+
76+
let df1 = df_cached.clone().filter(col("bool_col").is_true())?;
77+
let df2 = df1.clone().sort(vec![col("id").sort(true, false)])?;
78+
79+
// should see log for caching only once
80+
df_cached.show().await?;
81+
df1.show().await?;
82+
df2.show().await?;
83+
84+
Ok(())
85+
}
86+
87+
#[derive(Debug)]
88+
struct CustomCacheFactory {}
89+
90+
impl CacheFactory for CustomCacheFactory {
91+
fn create(
92+
&self,
93+
plan: LogicalPlan,
94+
_session_state: &SessionState,
95+
) -> Result<LogicalPlan> {
96+
Ok(LogicalPlan::Extension(Extension {
97+
node: Arc::new(CacheNode { input: plan }),
98+
}))
99+
}
100+
}
101+
102+
#[derive(PartialEq, Eq, PartialOrd, Hash, Debug)]
103+
struct CacheNode {
104+
input: LogicalPlan,
105+
}
106+
107+
impl UserDefinedLogicalNodeCore for CacheNode {
108+
fn name(&self) -> &str {
109+
"CacheNode"
110+
}
111+
112+
fn inputs(&self) -> Vec<&LogicalPlan> {
113+
vec![&self.input]
114+
}
115+
116+
fn schema(&self) -> &DFSchemaRef {
117+
self.input.schema()
118+
}
119+
120+
fn expressions(&self) -> Vec<Expr> {
121+
vec![]
122+
}
123+
124+
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
125+
write!(f, "CacheNode")
126+
}
127+
128+
fn with_exprs_and_inputs(
129+
&self,
130+
_exprs: Vec<Expr>,
131+
mut inputs: Vec<LogicalPlan>,
132+
) -> Result<Self> {
133+
assert_eq!(inputs.len(), 1, "input size must be one");
134+
Ok(Self {
135+
input: inputs.swap_remove(0),
136+
})
137+
}
138+
}
139+
140+
struct CacheNodePlanner {
141+
cache_manager: Arc<RwLock<CacheManager>>,
142+
}
143+
144+
#[async_trait]
145+
impl ExtensionPlanner for CacheNodePlanner {
146+
async fn plan_extension(
147+
&self,
148+
_planner: &dyn PhysicalPlanner,
149+
node: &dyn UserDefinedLogicalNode,
150+
logical_inputs: &[&LogicalPlan],
151+
physical_inputs: &[Arc<dyn ExecutionPlan>],
152+
session_state: &SessionState,
153+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
154+
if let Some(cache_node) = node.as_any().downcast_ref::<CacheNode>() {
155+
assert_eq!(logical_inputs.len(), 1, "Inconsistent number of inputs");
156+
assert_eq!(physical_inputs.len(), 1, "Inconsistent number of inputs");
157+
if self
158+
.cache_manager
159+
.read()
160+
.unwrap()
161+
.get(&cache_node.input)
162+
.is_none()
163+
{
164+
let ctx = session_state.task_ctx();
165+
println!("caching in memory");
166+
let batches =
167+
collect_partitioned(physical_inputs[0].clone(), ctx).await?;
168+
self.cache_manager
169+
.write()
170+
.unwrap()
171+
.put(cache_node.input.clone(), batches);
172+
} else {
173+
println!("fetching directly from cache manager");
174+
}
175+
Ok(self
176+
.cache_manager
177+
.read()
178+
.unwrap()
179+
.get(&cache_node.input)
180+
.map(|batches| {
181+
let exec: Arc<dyn ExecutionPlan> = MemorySourceConfig::try_new_exec(
182+
batches,
183+
physical_inputs[0].schema(),
184+
None,
185+
)
186+
.unwrap();
187+
exec
188+
}))
189+
} else {
190+
Ok(None)
191+
}
192+
}
193+
}
194+
195+
#[derive(Debug, Default)]
196+
struct CacheNodeQueryPlanner {
197+
cache_manager: Arc<RwLock<CacheManager>>,
198+
}
199+
200+
#[async_trait]
201+
impl QueryPlanner for CacheNodeQueryPlanner {
202+
async fn create_physical_plan(
203+
&self,
204+
logical_plan: &LogicalPlan,
205+
session_state: &SessionState,
206+
) -> Result<Arc<dyn ExecutionPlan>> {
207+
let physical_planner =
208+
DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
209+
CacheNodePlanner {
210+
cache_manager: Arc::clone(&self.cache_manager),
211+
},
212+
)]);
213+
physical_planner
214+
.create_physical_plan(logical_plan, session_state)
215+
.await
216+
}
217+
}
218+
219+
// This naive implementation only includes put, but for real production use cases cache eviction and drop should also be implemented.
220+
#[derive(Debug, Default)]
221+
struct CacheManager {
222+
cache: HashMap<LogicalPlan, Vec<Vec<RecordBatch>>>,
223+
}
224+
225+
impl CacheManager {
226+
pub fn put(&mut self, k: LogicalPlan, v: Vec<Vec<RecordBatch>>) {
227+
self.cache.insert(k, v);
228+
}
229+
230+
pub fn get(&self, k: &LogicalPlan) -> Option<&Vec<Vec<RecordBatch>>> {
231+
self.cache.get(k)
232+
}
233+
}

datafusion-examples/examples/dataframe/main.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
//! - `dataframe` — run a query using a DataFrame API against parquet files, csv files, and in-memory data, including multiple subqueries
3030
//! - `deserialize_to_struct` — convert query results (Arrow ArrayRefs) into Rust structs
3131
32+
mod cache_factory;
3233
mod dataframe;
3334
mod deserialize_to_struct;
3435

@@ -42,6 +43,7 @@ enum ExampleKind {
4243
All,
4344
Dataframe,
4445
DeserializeToStruct,
46+
CacheFactory,
4547
}
4648

4749
impl ExampleKind {
@@ -65,6 +67,9 @@ impl ExampleKind {
6567
ExampleKind::DeserializeToStruct => {
6668
deserialize_to_struct::deserialize_to_struct().await?;
6769
}
70+
ExampleKind::CacheFactory => {
71+
cache_factory::cache_dataframe_with_custom_logic().await?;
72+
}
6873
}
6974
Ok(())
7075
}

0 commit comments

Comments
 (0)