Skip to content

Commit a30cf37

Browse files
Add relation planner extension support to customize SQL planning (#17843)
## Which issue does this PR close? - Closes #18078. - Closes #17633 - Closes #13563 - Closes #17824 ## Rationale for this change DataFusion currently lacks a clean way to customize how SQL table factors (FROM clause elements) are planned into logical plans. The proposed workaround in #17633 has a critical limitation: it only works at the query root level and cannot handle custom relations inside JOINs, CTEs, or subqueries. This PR introduces a `RelationPlanner` extension API that allows users to intercept and customize table factor planning at any nesting level, enabling support for SQL syntax extensions that go beyond simple table-valued functions. For example, you can now combine multiple custom relation types in a single query: ```rust let ctx = SessionContext::new(); // Register custom planners for SQL syntax extensions ctx.register_relation_planner(Arc::new(TableSamplePlanner))?; ctx.register_relation_planner(Arc::new(MatchRecognizePlanner))?; ctx.register_relation_planner(Arc::new(PivotUnpivotPlanner))?; // Use multiple custom table modifiers together - even in nested JOINs or CTEs let df = ctx.sql(r#" WITH sampled_data AS ( SELECT * FROM stock_prices TABLESAMPLE BERNOULLI(10 PERCENT) REPEATABLE(42) ) SELECT symbol, quarter, price FROM sampled_data MATCH_RECOGNIZE ( PARTITION BY symbol ORDER BY time MEASURES LAST(price) AS price, quarter PATTERN (UP+ DOWN+) DEFINE UP AS price > PREV(price), DOWN AS price < PREV(price) ) AS patterns PIVOT ( AVG(price) FOR quarter IN ('Q1', 'Q2', 'Q3', 'Q4') ) AS pivoted "#).await?; df.show().await?; ``` **Why not use `TableFunctionImpl`?** The existing `TableFunctionImpl` trait is perfect for simple table-valued functions (like `generate_series(1, 10)`), but it cannot handle: - SQL clause modifiers (e.g., `TABLESAMPLE` that modifies an existing table reference) - New table factor syntaxes (e.g., `MATCH_RECOGNIZE`, `PIVOT`, `UNPIVOT`) - Complex syntax that doesn't follow the function call pattern `RelationPlanner` fills this gap by intercepting arbitrary `TableFactor` AST nodes and transforming them into logical plans. ## What changes are included in this PR? **Core API (feat commit):** - New `RelationPlanner` trait for customizing SQL table factor planning - `RelationPlannerContext` trait providing SQL utilities to extension planners - `SessionContext::register_relation_planner()` for registering custom planners - `SessionState` integration with priority-based planner chain - Integration into `SqlToRel` to invoke planners at all nesting levels (not just root) **Tests (test commit):** - Comprehensive tests in `datafusion/core/tests/user_defined/relation_planner.rs` - Coverage for basic planner registration, priority ordering, and nested relations - Tests demonstrating custom table factors and syntax extensions **Examples (example commit):** - `table_sample.rs` - SQL TABLESAMPLE clause support (BERNOULLI/SYSTEM methods, REPEATABLE seed), adapted from #17633. - `match_recognize.rs` - SQL MATCH_RECOGNIZE pattern matching on event streams - `pivot_unpivot.rs` - SQL PIVOT/UNPIVOT transformations for data reshaping **Note:** The examples are intentionally verbose to demonstrate the full design and capabilities of the API. They should be simplified and streamlined before merging as there is no need for three different examples of the same feature: the `PIVOT/UNPIVOT` example is probably more than enough. ## Are these changes tested? Yes: - Unit tests for planner registration, priority ordering, and chaining - Integration tests demonstrating nested relation handling (JOINs, CTEs, subqueries) - Example programs serve as additional end-to-end tests - All examples include multiple test cases showing different usage patterns - Examples demonstrate syntax that cannot be implemented with `TableFunctionImpl` ## Are there any user-facing changes? Yes, this is a new public API: **New APIs:** - `datafusion_expr::planner::RelationPlanner` trait - `datafusion_expr::planner::RelationPlannerContext` trait - `datafusion_expr::planner::PlannedRelation` struct - `datafusion_expr::planner::RelationPlanning` enum - `SessionContext::register_relation_planner()` - `SessionState::register_relation_planner()` and `relation_planners()` - `SessionStateBuilder::with_relation_planners()` - `ContextProvider::get_relation_planners()` This is an additive change that extends existing extensibility APIs (`ExprPlanner`, `TypePlanner`) and requires the `sql` feature flag. ## AI-Generated Code Disclosure This PR was developed with significant assistance from `claude-sonnet-4.5`. The AI was heavily involved in all parts of the process, from initial design to actual code to writing the PR description, which greatly sped up development. All its output was however carefully reviewed before submitting. --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 7ea5066 commit a30cf37

File tree

13 files changed

+2755
-9
lines changed

13 files changed

+2755
-9
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-examples/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,14 @@ dashmap = { workspace = true }
4646
# note only use main datafusion crate for examples
4747
base64 = "0.22.1"
4848
datafusion = { workspace = true, default-features = true, features = ["parquet_encryption"] }
49+
datafusion-common = { workspace = true }
50+
datafusion-expr = { workspace = true }
4951
datafusion-physical-expr-adapter = { workspace = true }
5052
datafusion-proto = { workspace = true }
53+
datafusion-sql = { workspace = true }
5154
env_logger = { workspace = true }
5255
futures = { workspace = true }
56+
insta = { workspace = true }
5357
log = { workspace = true }
5458
mimalloc = { version = "0.1", default-features = false }
5559
object_store = { workspace = true, features = ["aws", "http"] }

datafusion-examples/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ cargo run --example dataframe -- dataframe
8686
- [`examples/external_dependency/query_aws_s3.rs`](examples/external_dependency/query_aws_s3.rs): Configure `object_store` and run a query against files stored in AWS S3
8787
- [`examples/data_io/query_http_csv.rs`](examples/data_io/query_http_csv.rs): Configure `object_store` and run a query against files via HTTP
8888
- [`examples/builtin_functions/regexp.rs`](examples/builtin_functions/regexp.rs): Examples of using regular expression functions
89+
- [`examples/relation_planner/match_recognize.rs`](examples/relation_planner/match_recognize.rs): Use custom relation planner to implement MATCH_RECOGNIZE pattern matching
90+
- [`examples/relation_planner/pivot_unpivot.rs`](examples/relation_planner/pivot_unpivot.rs): Use custom relation planner to implement PIVOT and UNPIVOT operations
91+
- [`examples/relation_planner/table_sample.rs`](examples/relation_planner/table_sample.rs): Use custom relation planner to implement TABLESAMPLE clause
8992
- [`examples/data_io/remote_catalog.rs`](examples/data_io/remote_catalog.rs): Examples of interfacing with a remote catalog (e.g. over a network)
9093
- [`examples/udf/simple_udaf.rs`](examples/udf/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF)
9194
- [`examples/udf/simple_udf.rs`](examples/udf/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF)
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! # Relation Planner Examples
19+
//!
20+
//! These examples demonstrate how to use custom relation planners to extend
21+
//! DataFusion's SQL syntax with custom table operators.
22+
//!
23+
//! ## Usage
24+
//! ```bash
25+
//! cargo run --example relation_planner -- [match_recognize|pivot_unpivot|table_sample]
26+
//! ```
27+
//!
28+
//! Each subcommand runs a corresponding example:
29+
//! - `match_recognize` — MATCH_RECOGNIZE pattern matching on event streams
30+
//! - `pivot_unpivot` — PIVOT and UNPIVOT operations for reshaping data
31+
//! - `table_sample` — TABLESAMPLE clause for sampling rows from tables
32+
//!
33+
//! ## Snapshot Testing
34+
//!
35+
//! These examples use [insta](https://insta.rs) for inline snapshot assertions.
36+
//! If query output changes, regenerate the snapshots with:
37+
//! ```bash
38+
//! cargo insta test --example relation_planner --accept
39+
//! ```
40+
41+
mod match_recognize;
42+
mod pivot_unpivot;
43+
mod table_sample;
44+
45+
use std::str::FromStr;
46+
47+
use datafusion::error::{DataFusionError, Result};
48+
49+
enum ExampleKind {
50+
MatchRecognize,
51+
PivotUnpivot,
52+
TableSample,
53+
}
54+
55+
impl AsRef<str> for ExampleKind {
56+
fn as_ref(&self) -> &str {
57+
match self {
58+
Self::MatchRecognize => "match_recognize",
59+
Self::PivotUnpivot => "pivot_unpivot",
60+
Self::TableSample => "table_sample",
61+
}
62+
}
63+
}
64+
65+
impl FromStr for ExampleKind {
66+
type Err = DataFusionError;
67+
68+
fn from_str(s: &str) -> Result<Self> {
69+
match s {
70+
"match_recognize" => Ok(Self::MatchRecognize),
71+
"pivot_unpivot" => Ok(Self::PivotUnpivot),
72+
"table_sample" => Ok(Self::TableSample),
73+
_ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))),
74+
}
75+
}
76+
}
77+
78+
impl ExampleKind {
79+
const ALL: [Self; 3] = [Self::MatchRecognize, Self::PivotUnpivot, Self::TableSample];
80+
81+
const EXAMPLE_NAME: &str = "relation_planner";
82+
83+
fn variants() -> Vec<&'static str> {
84+
Self::ALL.iter().map(|x| x.as_ref()).collect()
85+
}
86+
}
87+
88+
#[tokio::main]
89+
async fn main() -> Result<()> {
90+
let usage = format!(
91+
"Usage: cargo run --example {} -- [{}]",
92+
ExampleKind::EXAMPLE_NAME,
93+
ExampleKind::variants().join("|")
94+
);
95+
96+
let arg = std::env::args().nth(1).ok_or_else(|| {
97+
eprintln!("{usage}");
98+
DataFusionError::Execution("Missing argument".to_string())
99+
})?;
100+
101+
if arg == "all" {
102+
for example in ExampleKind::ALL {
103+
match example {
104+
ExampleKind::MatchRecognize => match_recognize::match_recognize().await?,
105+
ExampleKind::PivotUnpivot => pivot_unpivot::pivot_unpivot().await?,
106+
ExampleKind::TableSample => table_sample::table_sample().await?,
107+
}
108+
}
109+
} else {
110+
match arg.parse::<ExampleKind>()? {
111+
ExampleKind::MatchRecognize => match_recognize::match_recognize().await?,
112+
ExampleKind::PivotUnpivot => pivot_unpivot::pivot_unpivot().await?,
113+
ExampleKind::TableSample => table_sample::table_sample().await?,
114+
}
115+
}
116+
117+
Ok(())
118+
}
119+
120+
/// Test wrappers that enable `cargo insta test --example relation_planner --accept`
121+
/// to regenerate inline snapshots. Without these, insta cannot run the examples
122+
/// in test mode since they only have `main()` functions.
123+
#[cfg(test)]
124+
mod tests {
125+
use super::*;
126+
127+
#[tokio::test]
128+
async fn test_match_recognize() {
129+
match_recognize::match_recognize().await.unwrap();
130+
}
131+
132+
#[tokio::test]
133+
async fn test_pivot_unpivot() {
134+
pivot_unpivot::pivot_unpivot().await.unwrap();
135+
}
136+
137+
#[tokio::test]
138+
async fn test_table_sample() {
139+
table_sample::table_sample().await.unwrap();
140+
}
141+
}

0 commit comments

Comments
 (0)