Skip to content

Commit a7b113c

Browse files
authored
Support AS, UNION, INTERSECTION, EXCEPT, AGGREGATE pipe operators (#17312)
* support WHERE pipe operator * support order by * support limit * select pipe * extend support * document supported pipe operators in user guide * fmt * fix where pipe before extend * support AS * support union * support intersection * support except * support aggregate * revert diff from main * simplify using mut * remove useless comments * remove dummy data * move docs to select.md * simplify using alias_if_changed * deduplicate fn * add aggregate toc * revert parquet testing
1 parent 71512e6 commit a7b113c

File tree

3 files changed

+290
-2
lines changed

3 files changed

+290
-2
lines changed

datafusion/sql/src/query.rs

Lines changed: 111 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ use datafusion_expr::{
2828
CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder,
2929
};
3030
use sqlparser::ast::{
31-
Expr as SQLExpr, Ident, LimitClause, Offset, OffsetRows, OrderBy, OrderByExpr,
32-
OrderByKind, PipeOperator, Query, SelectInto, SetExpr,
31+
Expr as SQLExpr, ExprWithAliasAndOrderBy, Ident, LimitClause, Offset, OffsetRows,
32+
OrderBy, OrderByExpr, OrderByKind, PipeOperator, Query, SelectInto, SetExpr,
33+
SetOperator, SetQuantifier, TableAlias,
3334
};
3435
use sqlparser::tokenizer::Span;
3536

@@ -146,11 +147,80 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
146147
.collect();
147148
self.project(plan, all_exprs)
148149
}
150+
PipeOperator::As { alias } => self.apply_table_alias(
151+
plan,
152+
TableAlias {
153+
name: alias,
154+
// Apply to all fields
155+
columns: vec![],
156+
},
157+
),
158+
PipeOperator::Union {
159+
set_quantifier,
160+
queries,
161+
} => self.pipe_operator_set(
162+
plan,
163+
SetOperator::Union,
164+
set_quantifier,
165+
queries,
166+
planner_context,
167+
),
168+
PipeOperator::Intersect {
169+
set_quantifier,
170+
queries,
171+
} => self.pipe_operator_set(
172+
plan,
173+
SetOperator::Intersect,
174+
set_quantifier,
175+
queries,
176+
planner_context,
177+
),
178+
PipeOperator::Except {
179+
set_quantifier,
180+
queries,
181+
} => self.pipe_operator_set(
182+
plan,
183+
SetOperator::Except,
184+
set_quantifier,
185+
queries,
186+
planner_context,
187+
),
188+
PipeOperator::Aggregate {
189+
full_table_exprs,
190+
group_by_expr,
191+
} => self.pipe_operator_aggregate(
192+
plan,
193+
full_table_exprs,
194+
group_by_expr,
195+
planner_context,
196+
),
149197

150198
x => not_impl_err!("`{x}` pipe operator is not supported yet"),
151199
}
152200
}
153201

202+
/// Handle Union/Intersect/Except pipe operators
203+
fn pipe_operator_set(
204+
&self,
205+
mut plan: LogicalPlan,
206+
set_operator: SetOperator,
207+
set_quantifier: SetQuantifier,
208+
queries: Vec<Query>,
209+
planner_context: &mut PlannerContext,
210+
) -> Result<LogicalPlan> {
211+
for query in queries {
212+
let right_plan = self.query_to_plan(query, planner_context)?;
213+
plan = self.set_operation_to_plan(
214+
set_operator,
215+
plan,
216+
right_plan,
217+
set_quantifier,
218+
)?;
219+
}
220+
221+
Ok(plan)
222+
}
223+
154224
/// Wrap a plan in a limit
155225
fn limit(
156226
&self,
@@ -227,6 +297,45 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
227297
}
228298
}
229299

300+
/// Handle AGGREGATE pipe operator
301+
fn pipe_operator_aggregate(
302+
&self,
303+
plan: LogicalPlan,
304+
full_table_exprs: Vec<ExprWithAliasAndOrderBy>,
305+
group_by_expr: Vec<ExprWithAliasAndOrderBy>,
306+
planner_context: &mut PlannerContext,
307+
) -> Result<LogicalPlan> {
308+
let plan_schema = plan.schema();
309+
let process_expr =
310+
|expr_with_alias_and_order_by: ExprWithAliasAndOrderBy,
311+
planner_context: &mut PlannerContext| {
312+
let expr_with_alias = expr_with_alias_and_order_by.expr;
313+
let sql_expr = expr_with_alias.expr;
314+
let alias = expr_with_alias.alias;
315+
316+
let df_expr = self.sql_to_expr(sql_expr, plan_schema, planner_context)?;
317+
318+
match alias {
319+
Some(alias_ident) => df_expr.alias_if_changed(alias_ident.value),
320+
None => Ok(df_expr),
321+
}
322+
};
323+
324+
let aggr_exprs: Vec<Expr> = full_table_exprs
325+
.into_iter()
326+
.map(|e| process_expr(e, planner_context))
327+
.collect::<Result<Vec<_>>>()?;
328+
329+
let group_by_exprs: Vec<Expr> = group_by_expr
330+
.into_iter()
331+
.map(|e| process_expr(e, planner_context))
332+
.collect::<Result<Vec<_>>>()?;
333+
334+
LogicalPlanBuilder::from(plan)
335+
.aggregate(group_by_exprs, aggr_exprs)?
336+
.build()
337+
}
338+
230339
/// Wrap the logical plan in a `SelectInto`
231340
fn select_into(
232341
&self,

datafusion/sqllogictest/test_files/pipe_operator.slt

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,3 +89,91 @@ FROM test
8989
|> EXTEND a + b AS a_plus_b
9090
----
9191
1 1.1 2.1
92+
93+
# AS pipe
94+
query I
95+
SELECT *
96+
FROM test
97+
|> as test_pipe
98+
|> select test_pipe.a
99+
----
100+
1
101+
2
102+
3
103+
104+
# UNION pipe
105+
query I
106+
SELECT *
107+
FROM test
108+
|> select a
109+
|> UNION ALL (
110+
SELECT a FROM test
111+
);
112+
----
113+
1
114+
2
115+
3
116+
1
117+
2
118+
3
119+
120+
# INTERSECT pipe
121+
query I rowsort
122+
SELECT * FROM range(0,3)
123+
|> INTERSECT DISTINCT
124+
(SELECT * FROM range(1,3));
125+
----
126+
1
127+
2
128+
129+
# EXCEPT pipe
130+
query I rowsort
131+
select * from range(0,10)
132+
|> EXCEPT DISTINCT (select * from range(5,10));
133+
----
134+
0
135+
1
136+
2
137+
3
138+
4
139+
140+
# AGGREGATE pipe
141+
query II
142+
(
143+
SELECT 'apples' AS item, 2 AS sales
144+
UNION ALL
145+
SELECT 'bananas' AS item, 5 AS sales
146+
UNION ALL
147+
SELECT 'apples' AS item, 7 AS sales
148+
)
149+
|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales;
150+
----
151+
3 14
152+
153+
query TII rowsort
154+
(
155+
SELECT 'apples' AS item, 2 AS sales
156+
UNION ALL
157+
SELECT 'bananas' AS item, 5 AS sales
158+
UNION ALL
159+
SELECT 'apples' AS item, 7 AS sales
160+
)
161+
|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales
162+
GROUP BY item;
163+
----
164+
apples 2 9
165+
bananas 1 5
166+
167+
query TII rowsort
168+
(
169+
SELECT 'apples' AS item, 2 AS sales
170+
UNION ALL
171+
SELECT 'bananas' AS item, 5 AS sales
172+
UNION ALL
173+
SELECT 'apples' AS item, 7 AS sales
174+
)
175+
|> AGGREGATE COUNT(*) AS num_items, SUM(sales) AS total_sales
176+
GROUP BY item
177+
|> WHERE num_items > 1;
178+
----
179+
apples 2 9

docs/source/user-guide/sql/select.md

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,11 @@ DataFusion currently supports the following pipe operators:
345345
- [LIMIT](#pipe_limit)
346346
- [SELECT](#pipe_select)
347347
- [EXTEND](#pipe_extend)
348+
- [AS](#pipe_as)
349+
- [UNION](#pipe_union)
350+
- [INTERSECT](#pipe_intersect)
351+
- [EXCEPT](#pipe_except)
352+
- [AGGREGATE](#pipe_aggregate)
348353

349354
(pipe_where)=
350355

@@ -423,3 +428,89 @@ select * from range(0,3)
423428
| 2 | -2 |
424429
+-------+-------------+
425430
```
431+
432+
(pipe_as)=
433+
434+
### AS
435+
436+
```sql
437+
select * from range(0,3)
438+
|> as my_range
439+
|> SELECT my_range.value;
440+
+-------+
441+
| value |
442+
+-------+
443+
| 0 |
444+
| 1 |
445+
| 2 |
446+
+-------+
447+
```
448+
449+
(pipe_union)=
450+
451+
### UNION
452+
453+
```sql
454+
select * from range(0,3)
455+
|> union all (
456+
select * from range(3,6)
457+
);
458+
+-------+
459+
| value |
460+
+-------+
461+
| 0 |
462+
| 1 |
463+
| 2 |
464+
| 3 |
465+
| 4 |
466+
| 5 |
467+
+-------+
468+
```
469+
470+
(pipe_intersect)=
471+
472+
### INTERSECT
473+
474+
```sql
475+
select * from range(0,100)
476+
|> INTERSECT DISTINCT (
477+
select 3
478+
);
479+
+-------+
480+
| value |
481+
+-------+
482+
| 3 |
483+
+-------+
484+
```
485+
486+
(pipe_except)=
487+
488+
### EXCEPT
489+
490+
```sql
491+
select * from range(0,10)
492+
|> EXCEPT DISTINCT (select * from range(5,10));
493+
+-------+
494+
| value |
495+
+-------+
496+
| 0 |
497+
| 1 |
498+
| 2 |
499+
| 3 |
500+
| 4 |
501+
+-------+
502+
```
503+
504+
(pipe_aggregate)=
505+
506+
### AGGREGATE
507+
508+
```sql
509+
select * from range(0,3)
510+
|> aggregate sum(value) AS total;
511+
+-------+
512+
| total |
513+
+-------+
514+
| 3 |
515+
+-------+
516+
```

0 commit comments

Comments
 (0)