Skip to content

Commit aac786a

Browse files
authored
feat: Add Distinct Logical Node (#89)
1 parent 029c46a commit aac786a

File tree

10 files changed

+192
-42
lines changed

10 files changed

+192
-42
lines changed

datafusion-cli/Cargo.lock

Lines changed: 19 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/core/src/dataframe.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,30 @@ impl DataFrame {
235235
Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
236236
}
237237

238-
/// Calculate the union distinct two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
238+
/// Calculate the distinct union of two [`DataFrame`]s. The
239+
/// two [`DataFrame`]s must have exactly the same schema
240+
///
241+
/// ```
242+
/// # use datafusion::prelude::*;
243+
/// # use datafusion::error::Result;
244+
/// # #[tokio::main]
245+
/// # async fn main() -> Result<()> {
246+
/// let ctx = SessionContext::new();
247+
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
248+
/// let df = df.union_distinct(df.clone())?;
249+
/// # Ok(())
250+
/// # }
251+
/// ```
252+
pub fn union_distinct(&self, dataframe: Arc<DataFrame>) -> Result<Arc<DataFrame>> {
253+
Ok(Arc::new(DataFrame::new(
254+
self.session_state.clone(),
255+
&LogicalPlanBuilder::from(self.plan.clone())
256+
.union_distinct(dataframe.plan.clone())?
257+
.build()?,
258+
)))
259+
}
260+
261+
/// Filter out duplicate rows
239262
///
240263
/// ```
241264
/// # use datafusion::prelude::*;

datafusion/core/src/logical_plan/builder.rs

Lines changed: 74 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,12 @@ use arrow::{
3838
use datafusion_data_access::object_store::ObjectStore;
3939
use datafusion_physical_expr::coercion_rule::binary_rule::comparison_eq_coercion;
4040
use std::convert::TryFrom;
41-
use std::iter;
4241
use std::{
4342
collections::{HashMap, HashSet},
4443
sync::Arc,
4544
};
4645

47-
use super::{dfschema::ToDFSchema, expr_rewriter::coerce_plan_expr_for_schema};
46+
use super::{dfschema::ToDFSchema, expr_rewriter::coerce_plan_expr_for_schema, Distinct};
4847
use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType};
4948
use crate::logical_plan::{
5049
columnize_expr, normalize_col, normalize_cols, rewrite_sort_cols_by_aggs, Column,
@@ -649,18 +648,34 @@ impl LogicalPlanBuilder {
649648
})))
650649
}
651650

652-
/// Apply a union
651+
/// Apply a union, preserving duplicate rows
653652
pub fn union(&self, plan: LogicalPlan) -> Result<Self> {
654653
Ok(Self::from(union_with_alias(self.plan.clone(), plan, None)?))
655654
}
656655

656+
/// Apply a union, removing duplicate rows
657+
pub fn union_distinct(&self, plan: LogicalPlan) -> Result<Self> {
658+
// unwrap top-level Distincts, to avoid duplication
659+
let left_plan = self.plan.clone();
660+
let left_plan: LogicalPlan = match left_plan {
661+
LogicalPlan::Distinct(Distinct { input }) => (*input).clone(),
662+
_ => left_plan,
663+
};
664+
let right_plan: LogicalPlan = match plan {
665+
LogicalPlan::Distinct(Distinct { input }) => (*input).clone(),
666+
_ => plan,
667+
};
668+
669+
Ok(Self::from(LogicalPlan::Distinct(Distinct {
670+
input: Arc::new(union_with_alias(left_plan, right_plan, None)?),
671+
})))
672+
}
673+
657674
/// Apply deduplication: Only distinct (different) values are returned)
658675
pub fn distinct(&self) -> Result<Self> {
659-
let projection_expr = expand_wildcard(self.plan.schema(), &self.plan)?;
660-
let plan = LogicalPlanBuilder::from(self.plan.clone())
661-
.aggregate(projection_expr, iter::empty::<Expr>())?
662-
.build()?;
663-
Self::from(plan).project(vec![Expr::Wildcard])
676+
Ok(Self::from(LogicalPlan::Distinct(Distinct {
677+
input: Arc::new(self.plan.clone()),
678+
})))
664679
}
665680

666681
/// Apply a join with on constraint
@@ -1402,6 +1417,57 @@ mod tests {
14021417
Ok(())
14031418
}
14041419

1420+
#[test]
1421+
fn plan_builder_union_distinct_combined_single_union() -> Result<()> {
1422+
let plan = LogicalPlanBuilder::scan_empty(
1423+
Some("employee_csv"),
1424+
&employee_schema(),
1425+
Some(vec![3, 4]),
1426+
)?;
1427+
1428+
let plan = plan
1429+
.union_distinct(plan.build()?)?
1430+
.union_distinct(plan.build()?)?
1431+
.union_distinct(plan.build()?)?
1432+
.build()?;
1433+
1434+
// output has only one union
1435+
let expected = "\
1436+
Distinct:\
1437+
\n Union\
1438+
\n TableScan: employee_csv projection=Some([3, 4])\
1439+
\n TableScan: employee_csv projection=Some([3, 4])\
1440+
\n TableScan: employee_csv projection=Some([3, 4])\
1441+
\n TableScan: employee_csv projection=Some([3, 4])";
1442+
1443+
assert_eq!(expected, format!("{:?}", plan));
1444+
1445+
Ok(())
1446+
}
1447+
1448+
#[test]
1449+
fn plan_builder_simple_distinct() -> Result<()> {
1450+
let plan = LogicalPlanBuilder::scan_empty(
1451+
Some("employee_csv"),
1452+
&employee_schema(),
1453+
Some(vec![0, 3]),
1454+
)?
1455+
.filter(col("state").eq(lit("CO")))?
1456+
.project(vec![col("id")])?
1457+
.distinct()?
1458+
.build()?;
1459+
1460+
let expected = "\
1461+
Distinct:\
1462+
\n Projection: #employee_csv.id\
1463+
\n Filter: #employee_csv.state = Utf8(\"CO\")\
1464+
\n TableScan: employee_csv projection=Some([0, 3])";
1465+
1466+
assert_eq!(expected, format!("{:?}", plan));
1467+
1468+
Ok(())
1469+
}
1470+
14051471
#[test]
14061472
fn projection_non_unique_names() -> Result<()> {
14071473
let plan = LogicalPlanBuilder::scan_empty(

datafusion/core/src/logical_plan/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ pub use expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
6565
pub use extension::UserDefinedLogicalNode;
6666
pub use operators::Operator;
6767
pub use plan::{
68-
CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CrossJoin, DropTable,
69-
EmptyRelation, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
70-
PlanVisitor, Repartition, StringifiedPlan, Subquery, TableScan, ToStringifiedPlan,
71-
Union, Values,
68+
CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CrossJoin, Distinct,
69+
DropTable, EmptyRelation, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning,
70+
PlanType, PlanVisitor, Repartition, StringifiedPlan, Subquery, TableScan,
71+
ToStringifiedPlan, Union, Values,
7272
};
7373
pub use registry::FunctionRegistry;

datafusion/core/src/logical_plan/plan.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,13 @@ pub struct Values {
297297
pub values: Vec<Vec<Expr>>,
298298
}
299299

300+
/// Removes duplicate rows from the input
301+
#[derive(Clone)]
302+
pub struct Distinct {
303+
/// The logical plan that is being DISTINCT'd
304+
pub input: Arc<LogicalPlan>,
305+
}
306+
300307
/// Aggregates its input based on a set of grouping and aggregate
301308
/// expressions (e.g. SUM).
302309
#[derive(Clone)]
@@ -417,6 +424,8 @@ pub enum LogicalPlan {
417424
TableUDFs(TableUDFs),
418425
/// Extension operator defined outside of DataFusion
419426
Extension(Extension),
427+
/// Remove duplicate rows from the input
428+
Distinct(Distinct),
420429
}
421430

422431
impl LogicalPlan {
@@ -431,6 +440,7 @@ impl LogicalPlan {
431440
LogicalPlan::Projection(Projection { schema, .. }) => schema,
432441
LogicalPlan::Subquery(Subquery { schema, .. }) => schema,
433442
LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
443+
LogicalPlan::Distinct(Distinct { input }) => input.schema(),
434444
LogicalPlan::Window(Window { schema, .. }) => schema,
435445
LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
436446
LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
@@ -504,6 +514,7 @@ impl LogicalPlan {
504514
| LogicalPlan::Sort(Sort { input, .. })
505515
| LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
506516
| LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
517+
LogicalPlan::Distinct(Distinct { input, .. }) => input.all_schemas(),
507518
LogicalPlan::DropTable(_) => vec![],
508519
}
509520
}
@@ -558,7 +569,8 @@ impl LogicalPlan {
558569
| LogicalPlan::Analyze { .. }
559570
| LogicalPlan::Explain { .. }
560571
| LogicalPlan::Subquery(_)
561-
| LogicalPlan::Union(_) => {
572+
| LogicalPlan::Union(_)
573+
| LogicalPlan::Distinct(_) => {
562574
vec![]
563575
}
564576
}
@@ -585,6 +597,7 @@ impl LogicalPlan {
585597
LogicalPlan::Limit(Limit { input, .. }) => vec![input],
586598
LogicalPlan::Extension(extension) => extension.node.inputs(),
587599
LogicalPlan::Union(Union { inputs, .. }) => inputs.iter().collect(),
600+
LogicalPlan::Distinct(Distinct { input }) => vec![input],
588601
LogicalPlan::Explain(explain) => vec![&explain.plan],
589602
LogicalPlan::Analyze(analyze) => vec![&analyze.input],
590603
LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
@@ -741,6 +754,7 @@ impl LogicalPlan {
741754
}
742755
true
743756
}
757+
LogicalPlan::Distinct(Distinct { input }) => input.accept(visitor)?,
744758
LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?,
745759
LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
746760
input.accept(visitor)?
@@ -1157,6 +1171,9 @@ impl LogicalPlan {
11571171
}) => {
11581172
write!(f, "DropTable: {:?} if not exist:={}", name, if_exists)
11591173
}
1174+
LogicalPlan::Distinct(Distinct { .. }) => {
1175+
write!(f, "Distinct:")
1176+
}
11601177
LogicalPlan::Explain { .. } => write!(f, "Explain"),
11611178
LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
11621179
LogicalPlan::Union(_) => write!(f, "Union"),

datafusion/core/src/optimizer/common_subexpr_eliminate.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ fn optimize(
249249
| LogicalPlan::CreateCatalogSchema(_)
250250
| LogicalPlan::DropTable(_)
251251
| LogicalPlan::Subquery(_)
252+
| LogicalPlan::Distinct(_)
252253
| LogicalPlan::Extension { .. } => {
253254
// apply the optimization to all inputs of the plan
254255
let expr = plan.expressions();

datafusion/core/src/optimizer/projection_push_down.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,7 @@ fn optimize_plan(
505505
| LogicalPlan::DropTable(_)
506506
| LogicalPlan::CrossJoin(_)
507507
| LogicalPlan::TableUDFs(_)
508+
| LogicalPlan::Distinct(_)
508509
| LogicalPlan::Extension { .. } => {
509510
let expr = plan.expressions();
510511
// collect all required columns by this plan

datafusion/core/src/optimizer/utils.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ use crate::optimizer::optimizer::OptimizerConfig;
2626

2727
use crate::logical_plan::builder::build_table_udf_schema;
2828
use crate::logical_plan::{
29-
build_join_schema, Column, CreateMemoryTable, DFSchemaRef, Expr, ExprVisitable,
30-
Limit, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning, Recursion,
31-
Repartition, Union, Values,
29+
build_join_schema, Column, CreateMemoryTable, DFSchemaRef, Distinct, Expr,
30+
ExprVisitable, Limit, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning,
31+
Recursion, Repartition, Union, Values,
3232
};
3333
use crate::prelude::lit;
3434
use crate::scalar::ScalarValue;
@@ -264,6 +264,9 @@ pub fn from_plan(
264264
alias: alias.clone(),
265265
}))
266266
}
267+
LogicalPlan::Distinct(Distinct { .. }) => Ok(LogicalPlan::Distinct(Distinct {
268+
input: Arc::new(inputs[0].clone()),
269+
})),
267270
LogicalPlan::Analyze(a) => {
268271
assert!(expr.is_empty());
269272
assert_eq!(inputs.len(), 1);

datafusion/core/src/physical_plan/planner.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,15 @@ use super::{
2424
hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows,
2525
};
2626
use crate::execution::context::{ExecutionProps, SessionState};
27+
use crate::logical_plan::builder::expand_wildcard;
2728
use crate::logical_plan::plan::{
2829
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, Subquery, TableScan,
2930
TableUDFs, Window,
3031
};
3132
use crate::logical_plan::{
32-
unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Operator,
33-
Partitioning as LogicalPartitioning, PlanType, Repartition, ToStringifiedPlan, Union,
34-
UserDefinedLogicalNode,
33+
unalias, unnormalize_cols, CrossJoin, DFSchema, Distinct, Expr, LogicalPlan,
34+
Operator, Partitioning as LogicalPartitioning, PlanType, Repartition,
35+
ToStringifiedPlan, Union, UserDefinedLogicalNode,
3536
};
3637
use crate::logical_plan::{Limit, Values};
3738
use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
@@ -616,6 +617,18 @@ impl DefaultPhysicalPlanner {
616617
input_exec,
617618
)?))
618619
}
620+
LogicalPlan::Distinct(Distinct {input}) => {
621+
// Convert distinct to groupby with no aggregations
622+
let group_expr = expand_wildcard(input.schema(), input)?;
623+
let aggregate = LogicalPlan::Aggregate(Aggregate {
624+
input: input.clone(),
625+
group_expr,
626+
aggr_expr: vec![],
627+
schema: input.schema().clone()
628+
}
629+
);
630+
Ok(self.create_initial_plan(&aggregate, session_state).await?)
631+
}
619632
LogicalPlan::Projection(Projection { input, expr, .. }) => {
620633
let input_exec = self.create_initial_plan(input, session_state).await?;
621634
let input_schema = input.as_ref().schema();

0 commit comments

Comments
 (0)