-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Improve performance of CASE WHEN x THEN y ELSE NULL expressions
#20097
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
0d64069
9124d37
0cf3156
7765229
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,6 +42,7 @@ use crate::expressions::case::literal_lookup_table::LiteralLookupTable; | |
| use arrow::compute::kernels::merge::{MergeIndex, merge, merge_n}; | ||
| use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; | ||
| use datafusion_physical_expr_common::datum::compare_with_eq; | ||
| use datafusion_physical_expr_common::utils::scatter; | ||
| use itertools::Itertools; | ||
| use std::fmt::{Debug, Formatter}; | ||
|
|
||
|
|
@@ -659,7 +660,7 @@ impl CaseExpr { | |
| && body.else_expr.as_ref().unwrap().as_any().is::<Literal>() | ||
| { | ||
| EvalMethod::ScalarOrScalar | ||
| } else if body.when_then_expr.len() == 1 && body.else_expr.is_some() { | ||
| } else if body.when_then_expr.len() == 1 { | ||
| EvalMethod::ExpressionOrExpression(body.project()?) | ||
| } else { | ||
| EvalMethod::NoExpression(body.project()?) | ||
|
|
@@ -961,32 +962,40 @@ impl CaseBody { | |
| let then_batch = filter_record_batch(batch, &when_filter)?; | ||
| let then_value = self.when_then_expr[0].1.evaluate(&then_batch)?; | ||
|
|
||
| let else_selection = not(&when_value)?; | ||
| let else_filter = create_filter(&else_selection, optimize_filter); | ||
| let else_batch = filter_record_batch(batch, &else_filter)?; | ||
|
|
||
| // keep `else_expr`'s data type and return type consistent | ||
| let e = self.else_expr.as_ref().unwrap(); | ||
| let return_type = self.data_type(&batch.schema())?; | ||
| let else_expr = try_cast(Arc::clone(e), &batch.schema(), return_type.clone()) | ||
| .unwrap_or_else(|_| Arc::clone(e)); | ||
|
|
||
| let else_value = else_expr.evaluate(&else_batch)?; | ||
|
|
||
| Ok(ColumnarValue::Array(match (then_value, else_value) { | ||
| (ColumnarValue::Array(t), ColumnarValue::Array(e)) => { | ||
| merge(&when_value, &t, &e) | ||
| } | ||
| (ColumnarValue::Scalar(t), ColumnarValue::Array(e)) => { | ||
| merge(&when_value, &t.to_scalar()?, &e) | ||
| } | ||
| (ColumnarValue::Array(t), ColumnarValue::Scalar(e)) => { | ||
| merge(&when_value, &t, &e.to_scalar()?) | ||
| match &self.else_expr { | ||
| None => { | ||
| let then_array = then_value.to_array(when_value.true_count())?; | ||
| scatter(&when_value, then_array.as_ref()).map(ColumnarValue::Array) | ||
| } | ||
| (ColumnarValue::Scalar(t), ColumnarValue::Scalar(e)) => { | ||
| merge(&when_value, &t.to_scalar()?, &e.to_scalar()?) | ||
| Some(else_expr) => { | ||
| let else_selection = not(&when_value)?; | ||
| let else_filter = create_filter(&else_selection, optimize_filter); | ||
| let else_batch = filter_record_batch(batch, &else_filter)?; | ||
|
|
||
| // keep `else_expr`'s data type and return type consistent | ||
| let return_type = self.data_type(&batch.schema())?; | ||
| let else_expr = | ||
| try_cast(Arc::clone(else_expr), &batch.schema(), return_type.clone()) | ||
| .unwrap_or_else(|_| Arc::clone(else_expr)); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why there is a fallback here ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a very good question. I think this is actually one of the few lines of code I did not modify. Let me see if I can figure out the answer from git history.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I traced this back to https://github.com/apache/datafusion/pull/1601/changes#diff-e27e1014e17614bf2ddd8b375475c5c31a279e58e97d2950a9a5427428645855R372 in PR #1601
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't remember the context either |
||
|
|
||
| let else_value = else_expr.evaluate(&else_batch)?; | ||
|
|
||
| Ok(ColumnarValue::Array(match (then_value, else_value) { | ||
| (ColumnarValue::Array(t), ColumnarValue::Array(e)) => { | ||
| merge(&when_value, &t, &e) | ||
| } | ||
| (ColumnarValue::Scalar(t), ColumnarValue::Array(e)) => { | ||
| merge(&when_value, &t.to_scalar()?, &e) | ||
| } | ||
| (ColumnarValue::Array(t), ColumnarValue::Scalar(e)) => { | ||
| merge(&when_value, &t, &e.to_scalar()?) | ||
| } | ||
| (ColumnarValue::Scalar(t), ColumnarValue::Scalar(e)) => { | ||
| merge(&when_value, &t.to_scalar()?, &e.to_scalar()?) | ||
| } | ||
| }?)) | ||
| } | ||
| }?)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1137,7 +1146,15 @@ impl CaseExpr { | |
| self.body.when_then_expr[0].1.evaluate(batch) | ||
| } else if true_count == 0 { | ||
| // All input rows are false/null, just call the 'else' expression | ||
| self.body.else_expr.as_ref().unwrap().evaluate(batch) | ||
| match &self.body.else_expr { | ||
| Some(else_expr) => else_expr.evaluate(batch), | ||
| None => { | ||
| let return_type = self.data_type(&batch.schema())?; | ||
| Ok(ColumnarValue::Scalar(ScalarValue::try_new_null( | ||
| &return_type, | ||
| )?)) | ||
| } | ||
| } | ||
| } else if projected.projection.len() < batch.num_columns() { | ||
| // The case expressions do not use all the columns of the input batch. | ||
| // Project first to reduce time spent filtering. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/pepijnve/datafusion/blob/0d640690aa665da749d30148e6a9c6440d2020db/datafusion/physical-expr/src/expressions/case.rs#L76 says
... and both thethenandelseare expressions. But this change makes the else part optional.Should ExpressionOrExpression's docstring be updated ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I should update the doc strings. I didn't check them to see if they got stale or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doc string updated