diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 43e1f776bf..0fd28cb58e 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -132,6 +132,7 @@ jobs: org.apache.comet.exec.CometAggregateSuite org.apache.comet.exec.CometExec3_4PlusSuite org.apache.comet.exec.CometExecSuite + org.apache.comet.exec.CometGenerateExecSuite org.apache.comet.exec.CometWindowExecSuite org.apache.comet.exec.CometJoinSuite org.apache.comet.CometNativeSuite diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 88dfd9f92a..e915fa74a6 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -97,6 +97,7 @@ jobs: org.apache.comet.exec.CometAggregateSuite org.apache.comet.exec.CometExec3_4PlusSuite org.apache.comet.exec.CometExecSuite + org.apache.comet.exec.CometGenerateExecSuite org.apache.comet.exec.CometWindowExecSuite org.apache.comet.exec.CometJoinSuite org.apache.comet.CometNativeSuite diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 1e5d19ee23..3560fc07cb 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -270,6 +270,8 @@ object CometConf extends ShimCometConf { createExecEnabledConfig("union", defaultValue = true) val COMET_EXEC_EXPAND_ENABLED: ConfigEntry[Boolean] = createExecEnabledConfig("expand", defaultValue = true) + val COMET_EXEC_EXPLODE_ENABLED: ConfigEntry[Boolean] = + createExecEnabledConfig("explode", defaultValue = true) val COMET_EXEC_WINDOW_ENABLED: ConfigEntry[Boolean] = createExecEnabledConfig("window", defaultValue = true) val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] = diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index fe8ca2f9d2..fc8fd923f9 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -1,5 +1,5 @@ diff --git a/pom.xml b/pom.xml -index d3544881af1..fbe1c4b9a87 100644 +index d3544881af1..07d1ed97925 100644 --- a/pom.xml +++ b/pom.xml @@ -148,6 +148,8 @@ @@ -1297,20 +1297,22 @@ index 4b3d3a4b805..56e1e0e6f16 100644 setupTestData() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala -index 9e9d717db3b..c1a7caf56e0 100644 +index 9e9d717db3b..ec73082f458 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala -@@ -17,7 +17,8 @@ +@@ -17,7 +17,10 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.{DataFrame, QueryTest, Row} ++import org.apache.comet.CometConf ++ +import org.apache.spark.sql.{DataFrame, IgnoreComet, QueryTest, Row} +import org.apache.spark.sql.comet.CometProjectExec import org.apache.spark.sql.connector.SimpleWritableDataSource import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.internal.SQLConf -@@ -34,7 +35,10 @@ abstract class RemoveRedundantProjectsSuiteBase +@@ -34,7 +37,10 @@ abstract class RemoveRedundantProjectsSuiteBase private def assertProjectExecCount(df: DataFrame, expected: Int): Unit = { withClue(df.queryExecution) { val plan = df.queryExecution.executedPlan @@ -1322,7 +1324,7 @@ index 9e9d717db3b..c1a7caf56e0 100644 assert(actual == expected) } } -@@ -112,7 +116,8 @@ abstract class RemoveRedundantProjectsSuiteBase +@@ -112,7 +118,8 @@ abstract class RemoveRedundantProjectsSuiteBase assertProjectExec(query, 1, 3) } @@ -1332,6 +1334,41 @@ index 9e9d717db3b..c1a7caf56e0 100644 val query = "select * from (select key, a, c, b from testView) as t1 join " + "(select key, a, b, c from testView) as t2 on t1.key = t2.key where t2.a > 50" assertProjectExec(query, 2, 2) +@@ -134,12 +141,21 @@ abstract class RemoveRedundantProjectsSuiteBase + val df = data.selectExpr("a", "b", "key", "explode(array(key, a, b)) as d").filter("d > 0") + df.collect() + val plan = df.queryExecution.executedPlan +- val numProjects = collectWithSubqueries(plan) { case p: ProjectExec => p }.length ++ val numProjects = collectWithSubqueries(plan) { ++ case p: ProjectExec => p ++ case p: CometProjectExec => p ++ }.length ++ ++ // Comet-specific change to get original Spark plan before applying ++ // a transformation to add a new ProjectExec ++ var sparkPlan: SparkPlan = null ++ withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") { ++ val df = data.selectExpr("a", "b", "key", "explode(array(key, a, b)) as d").filter("d > 0") ++ df.collect() ++ sparkPlan = df.queryExecution.executedPlan ++ } + +- // Create a new plan that reverse the GenerateExec output and add a new ProjectExec between +- // GenerateExec and its child. This is to test if the ProjectExec is removed, the output of +- // the query will be incorrect. +- val newPlan = stripAQEPlan(plan) transform { ++ val newPlan = stripAQEPlan(sparkPlan) transform { + case g @ GenerateExec(_, requiredChildOutput, _, _, child) => + g.copy(requiredChildOutput = requiredChildOutput.reverse, + child = ProjectExec(requiredChildOutput.reverse, child)) +@@ -151,6 +167,7 @@ abstract class RemoveRedundantProjectsSuiteBase + // The manually added ProjectExec node shouldn't be removed. + assert(collectWithSubqueries(newExecutedPlan) { + case p: ProjectExec => p ++ case p: CometProjectExec => p + }.size == numProjects + 1) + + // Check the original plan's output and the new plan's output are the same. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala index 30ce940b032..0d3f6c6c934 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala diff --git a/dev/diffs/3.5.7.diff b/dev/diffs/3.5.7.diff index a05e465754..3adc8c833c 100644 --- a/dev/diffs/3.5.7.diff +++ b/dev/diffs/3.5.7.diff @@ -1,5 +1,5 @@ diff --git a/pom.xml b/pom.xml -index a0e25ce4d8d..7db86212507 100644 +index a0e25ce4d8d..29d3b93f994 100644 --- a/pom.xml +++ b/pom.xml @@ -152,6 +152,8 @@ @@ -1274,18 +1274,21 @@ index de24b8c82b0..1f835481290 100644 setupTestData() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala -index 9e9d717db3b..91a4f9a38d5 100644 +index 9e9d717db3b..73de2b84938 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala -@@ -18,6 +18,7 @@ +@@ -17,7 +17,10 @@ + package org.apache.spark.sql.execution ++import org.apache.comet.CometConf ++ import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.comet.CometProjectExec import org.apache.spark.sql.connector.SimpleWritableDataSource import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.internal.SQLConf -@@ -34,7 +35,10 @@ abstract class RemoveRedundantProjectsSuiteBase +@@ -34,7 +37,10 @@ abstract class RemoveRedundantProjectsSuiteBase private def assertProjectExecCount(df: DataFrame, expected: Int): Unit = { withClue(df.queryExecution) { val plan = df.queryExecution.executedPlan @@ -1297,6 +1300,43 @@ index 9e9d717db3b..91a4f9a38d5 100644 assert(actual == expected) } } +@@ -134,12 +140,26 @@ abstract class RemoveRedundantProjectsSuiteBase + val df = data.selectExpr("a", "b", "key", "explode(array(key, a, b)) as d").filter("d > 0") + df.collect() + val plan = df.queryExecution.executedPlan +- val numProjects = collectWithSubqueries(plan) { case p: ProjectExec => p }.length ++ ++ val numProjects = collectWithSubqueries(plan) { ++ case p: ProjectExec => p ++ case p: CometProjectExec => p ++ }.length + + // Create a new plan that reverse the GenerateExec output and add a new ProjectExec between + // GenerateExec and its child. This is to test if the ProjectExec is removed, the output of + // the query will be incorrect. +- val newPlan = stripAQEPlan(plan) transform { ++ ++ // Comet-specific change to get original Spark plan before applying ++ // a transformation to add a new ProjectExec ++ var sparkPlan: SparkPlan = null ++ withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") { ++ val df = data.selectExpr("a", "b", "key", "explode(array(key, a, b)) as d").filter("d > 0") ++ df.collect() ++ sparkPlan = df.queryExecution.executedPlan ++ } ++ ++ val newPlan = stripAQEPlan(sparkPlan) transform { + case g @ GenerateExec(_, requiredChildOutput, _, _, child) => + g.copy(requiredChildOutput = requiredChildOutput.reverse, + child = ProjectExec(requiredChildOutput.reverse, child)) +@@ -151,6 +171,7 @@ abstract class RemoveRedundantProjectsSuiteBase + // The manually added ProjectExec node shouldn't be removed. + assert(collectWithSubqueries(newExecutedPlan) { + case p: ProjectExec => p ++ case p: CometProjectExec => p + }.size == numProjects + 1) + + // Check the original plan's output and the new plan's output are the same. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala index 005e764cc30..92ec088efab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index e0394728fe..a9315db005 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -1,5 +1,5 @@ diff --git a/pom.xml b/pom.xml -index 22922143fc3..3c1f5d381ee 100644 +index 22922143fc3..477d4ec4194 100644 --- a/pom.xml +++ b/pom.xml @@ -148,6 +148,8 @@ @@ -1701,20 +1701,22 @@ index 47d5ff67b84..8dc8f65d4b1 100644 withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala -index b5bac8079c4..a3731888e12 100644 +index b5bac8079c4..9420dbdb936 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala -@@ -17,7 +17,8 @@ +@@ -17,7 +17,10 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.{DataFrame, QueryTest, Row} ++import org.apache.comet.CometConf ++ +import org.apache.spark.sql.{DataFrame, IgnoreComet, QueryTest, Row} +import org.apache.spark.sql.comet.CometProjectExec import org.apache.spark.sql.connector.SimpleWritableDataSource import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.internal.SQLConf -@@ -34,7 +35,10 @@ abstract class RemoveRedundantProjectsSuiteBase +@@ -34,7 +37,10 @@ abstract class RemoveRedundantProjectsSuiteBase private def assertProjectExecCount(df: DataFrame, expected: Int): Unit = { withClue(df.queryExecution) { val plan = df.queryExecution.executedPlan @@ -1726,7 +1728,7 @@ index b5bac8079c4..a3731888e12 100644 assert(actual == expected) } } -@@ -112,7 +116,8 @@ abstract class RemoveRedundantProjectsSuiteBase +@@ -112,7 +118,8 @@ abstract class RemoveRedundantProjectsSuiteBase assertProjectExec(query, 1, 3) } @@ -1736,6 +1738,42 @@ index b5bac8079c4..a3731888e12 100644 val query = "select * from (select key, a, c, b from testView) as t1 join " + "(select key, a, b, c from testView) as t2 on t1.key = t2.key where t2.a > 50" assertProjectExec(query, 2, 2) +@@ -134,12 +141,25 @@ abstract class RemoveRedundantProjectsSuiteBase + val df = data.selectExpr("a", "b", "key", "explode(array(key, a, b)) as d").filter("d > 0") + df.collect() + val plan = df.queryExecution.executedPlan +- val numProjects = collectWithSubqueries(plan) { case p: ProjectExec => p }.length ++ val numProjects = collectWithSubqueries(plan) { ++ case p: ProjectExec => p ++ case p: CometProjectExec => p ++ }.length + + // Create a new plan that reverse the GenerateExec output and add a new ProjectExec between + // GenerateExec and its child. This is to test if the ProjectExec is removed, the output of + // the query will be incorrect. +- val newPlan = stripAQEPlan(plan) transform { ++ ++ // Comet-specific change to get original Spark plan before applying ++ // a transformation to add a new ProjectExec ++ var sparkPlan: SparkPlan = null ++ withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") { ++ val df = data.selectExpr("a", "b", "key", "explode(array(key, a, b)) as d").filter("d > 0") ++ df.collect() ++ sparkPlan = df.queryExecution.executedPlan ++ } ++ ++ val newPlan = stripAQEPlan(sparkPlan) transform { + case g @ GenerateExec(_, requiredChildOutput, _, _, child) => + g.copy(requiredChildOutput = requiredChildOutput.reverse, + child = ProjectExec(requiredChildOutput.reverse, child)) +@@ -151,6 +171,7 @@ abstract class RemoveRedundantProjectsSuiteBase + // The manually added ProjectExec node shouldn't be removed. + assert(collectWithSubqueries(newExecutedPlan) { + case p: ProjectExec => p ++ case p: CometProjectExec => p + }.size == numProjects + 1) + + // Check the original plan's output and the new plan's output are the same. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala index 005e764cc30..92ec088efab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index a1c3212c20..5f9cbc507d 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -162,6 +162,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.exec.coalesce.enabled` | Whether to enable coalesce by default. | true | | `spark.comet.exec.collectLimit.enabled` | Whether to enable collectLimit by default. | true | | `spark.comet.exec.expand.enabled` | Whether to enable expand by default. | true | +| `spark.comet.exec.explode.enabled` | Whether to enable explode by default. | true | | `spark.comet.exec.filter.enabled` | Whether to enable filter by default. | true | | `spark.comet.exec.globalLimit.enabled` | Whether to enable globalLimit by default. | true | | `spark.comet.exec.hashJoin.enabled` | Whether to enable hashJoin by default. | true | diff --git a/docs/source/user-guide/latest/operators.md b/docs/source/user-guide/latest/operators.md index f5f2d9724d..77ba84e4f7 100644 --- a/docs/source/user-guide/latest/operators.md +++ b/docs/source/user-guide/latest/operators.md @@ -30,6 +30,7 @@ not supported by Comet will fall back to regular Spark execution. | ExpandExec | Yes | | | FileSourceScanExec | Yes | Supports Parquet files. See the [Comet Compatibility Guide] for more information. | | FilterExec | Yes | | +| GenerateExec | Yes | Supports `explode` generator only. | | GlobalLimitExec | Yes | | | HashAggregateExec | Yes | | | InsertIntoHadoopFsRelationCommand | No | Experimental support for native Parquet writes. Disabled by default. | diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index b0746a6f84..c246d1fef6 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -96,9 +96,11 @@ use arrow::array::{ use arrow::buffer::{BooleanBuffer, NullBuffer, OffsetBuffer}; use arrow::row::{OwnedRow, RowConverter, SortField}; use datafusion::common::utils::SingleRowListArrayBuilder; +use datafusion::common::UnnestOptions; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::limit::GlobalLimitExec; +use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec}; use datafusion_comet_proto::spark_expression::ListLiteral; use datafusion_comet_proto::spark_operator::SparkFilePartition; use datafusion_comet_proto::{ @@ -1528,6 +1530,117 @@ impl PhysicalPlanner { Arc::new(SparkPlan::new(spark_plan.plan_id, expand, vec![child])), )) } + OpStruct::Explode(explode) => { + assert_eq!(children.len(), 1); + let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?; + + // Create the expression for the array to explode + let child_expr = if let Some(child_expr) = &explode.child { + self.create_expr(child_expr, child.schema())? + } else { + return Err(ExecutionError::GeneralError( + "Explode operator requires a child expression".to_string(), + )); + }; + + // Create projection expressions for other columns + let projections: Vec> = explode + .project_list + .iter() + .map(|expr| self.create_expr(expr, child.schema())) + .collect::, _>>()?; + + // For UnnestExec, we need to add a projection to put the columns in the right order: + // 1. First add all projection columns + // 2. Then add the array column to be exploded + // Then UnnestExec will unnest the last column + + // Use return_field() to get the proper column names from the expressions + let child_schema = child.schema(); + let mut project_exprs: Vec<(Arc, String)> = projections + .iter() + .map(|expr| { + let field = expr + .return_field(&child_schema) + .expect("Failed to get field from expression"); + let name = field.name().to_string(); + (Arc::clone(expr), name) + }) + .collect(); + + // Add the array column as the last column + let array_field = child_expr + .return_field(&child_schema) + .expect("Failed to get field from array expression"); + let array_col_name = array_field.name().to_string(); + project_exprs.push((Arc::clone(&child_expr), array_col_name.clone())); + + // Create a projection to arrange columns as needed + let project_exec = Arc::new(ProjectionExec::try_new( + project_exprs, + Arc::clone(&child.native_plan), + )?); + + // Get the input schema from the projection + let project_schema = project_exec.schema(); + + // Build the output schema for UnnestExec + // The output schema replaces the list column with its element type + let mut output_fields: Vec = Vec::new(); + + // Add all projection columns (non-array columns) + for i in 0..projections.len() { + output_fields.push(project_schema.field(i).clone()); + } + + // Add the unnested array element field + // Extract the element type from the list/array type + let array_field = project_schema.field(projections.len()); + let element_type = match array_field.data_type() { + DataType::List(field) => field.data_type().clone(), + dt => { + return Err(ExecutionError::GeneralError(format!( + "Expected List type for explode, got {:?}", + dt + ))) + } + }; + + // The output column has the same name as the input array column + // but with the element type instead of the list type + output_fields.push(Field::new( + array_field.name(), + element_type, + true, // Element is nullable after unnesting + )); + + let output_schema = Arc::new(Schema::new(output_fields)); + + // Use UnnestExec to explode the last column (the array column) + // ListUnnest specifies which column to unnest and the depth (1 for single level) + let list_unnest = ListUnnest { + index_in_input_schema: projections.len(), // Index of the array column to unnest + depth: 1, // Unnest one level (explode single array) + }; + + let unnest_options = UnnestOptions { + preserve_nulls: explode.outer, + recursions: vec![], + }; + + let unnest_exec = Arc::new(UnnestExec::new( + project_exec, + vec![list_unnest], + vec![], // No struct columns to unnest + output_schema, + unnest_options, + )); + + Ok(( + scans, + Arc::new(SparkPlan::new(spark_plan.plan_id, unnest_exec, vec![child])), + )) + } OpStruct::SortMergeJoin(join) => { let (join_params, scans) = self.parse_join_parameters( inputs, diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index a958327099..6eb6edc6b3 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -50,6 +50,7 @@ message Operator { NativeScan native_scan = 111; IcebergScan iceberg_scan = 112; ParquetWriter parquet_writer = 113; + Explode explode = 114; } } @@ -253,6 +254,15 @@ message Expand { int32 num_expr_per_project = 3; } +message Explode { + // The array expression to explode into multiple rows + spark.spark_expression.Expr child = 1; + // Whether this is explode_outer (produces null row for empty/null arrays) + bool outer = 2; + // Expressions for other columns to project alongside the exploded values + repeated spark.spark_expression.Expr project_list = 3; +} + message HashJoin { repeated spark.spark_expression.Expr left_join_keys = 1; repeated spark.spark_expression.Expr right_join_keys = 2; diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 124188b64d..2a051b1a77 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -62,6 +62,7 @@ object CometExecRule { classOf[LocalLimitExec] -> CometLocalLimitExec, classOf[GlobalLimitExec] -> CometGlobalLimitExec, classOf[ExpandExec] -> CometExpandExec, + classOf[GenerateExec] -> CometExplodeExec, classOf[HashAggregateExec] -> CometHashAggregateExec, classOf[ObjectHashAggregateExec] -> CometObjectHashAggregateExec, classOf[BroadcastHashJoinExec] -> CometBroadcastHashJoinExec, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index c955f79d91..33e44ea813 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.comet import java.io.ByteArrayOutputStream +import java.util.Locale import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -29,7 +30,7 @@ import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Expression, ExpressionSet, NamedExpression, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Expression, ExpressionSet, Generator, NamedExpression, SortOrder} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateMode, Final, Partial} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.catalyst.plans._ @@ -43,7 +44,7 @@ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, TimestampNTZType} +import org.apache.spark.sql.types.{ArrayType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, TimestampNTZType} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration import org.apache.spark.util.io.ChunkedByteBuffer @@ -53,7 +54,7 @@ import com.google.common.base.Objects import org.apache.comet.{CometConf, CometExecIterator, CometRuntimeException, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.{isCometShuffleEnabled, withInfo} import org.apache.comet.parquet.CometParquetUtils -import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} +import org.apache.comet.serde.{CometOperatorSerde, Compatible, Incompatible, OperatorOuterClass, SupportLevel, Unsupported} import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, Operator} import org.apache.comet.serde.QueryPlanSerde.{aggExprToProto, exprToProto, supportedSortType} import org.apache.comet.serde.operator.CometSink @@ -882,6 +883,124 @@ case class CometExpandExec( override lazy val metrics: Map[String, SQLMetric] = Map.empty } +object CometExplodeExec extends CometOperatorSerde[GenerateExec] { + + override def enabledConfig: Option[ConfigEntry[Boolean]] = Some( + CometConf.COMET_EXEC_EXPLODE_ENABLED) + + override def getSupportLevel(op: GenerateExec): SupportLevel = { + if (!op.generator.deterministic) { + return Unsupported(Some("Only deterministic generators are supported")) + } + if (op.generator.children.length != 1) { + return Unsupported(Some("generators with multiple inputs are not supported")) + } + if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode") { + return Unsupported(Some(s"Unsupported generator: ${op.generator.nodeName}")) + } + if (op.outer) { + // DataFusion UnnestExec has different semantics to Spark for this case + // https://github.com/apache/datafusion/issues/19053 + return Incompatible(Some("Empty arrays are not preserved as null outputs when outer=true")) + } + op.generator.children.head.dataType match { + case _: ArrayType => + Compatible() + case _: MapType => + // TODO add support for map types + // https://github.com/apache/datafusion-comet/issues/2837 + Unsupported(Some("Comet only supports explode/explode_outer for arrays, not maps")) + case other => + Unsupported(Some(s"Unsupported data type: $other")) + } + } + + override def convert( + op: GenerateExec, + builder: Operator.Builder, + childOp: OperatorOuterClass.Operator*): Option[OperatorOuterClass.Operator] = { + val childExpr = op.generator.children.head + val childExprProto = exprToProto(childExpr, op.child.output) + + if (childExprProto.isEmpty) { + withInfo(op, childExpr) + return None + } + + // Convert the projection expressions (columns to carry forward) + // These are the non-generator output columns + val generatorOutput = op.generatorOutput.toSet + val projectExprs = op.output.filterNot(generatorOutput.contains).map { attr => + exprToProto(attr, op.child.output) + } + + if (projectExprs.exists(_.isEmpty) || childOp.isEmpty) { + withInfo(op, op.output: _*) + return None + } + + val explodeBuilder = OperatorOuterClass.Explode + .newBuilder() + .setChild(childExprProto.get) + .setOuter(op.outer) + .addAllProjectList(projectExprs.map(_.get).asJava) + + Some(builder.setExplode(explodeBuilder).build()) + } + + override def createExec(nativeOp: Operator, op: GenerateExec): CometNativeExec = { + CometExplodeExec( + nativeOp, + op, + op.output, + op.generator, + op.generatorOutput, + op.child, + SerializedPlan(None)) + } +} + +case class CometExplodeExec( + override val nativeOp: Operator, + override val originalPlan: SparkPlan, + override val output: Seq[Attribute], + generator: Generator, + generatorOutput: Seq[Attribute], + child: SparkPlan, + override val serializedPlanOpt: SerializedPlan) + extends CometUnaryExec { + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def producedAttributes: AttributeSet = AttributeSet(generatorOutput) + + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = + this.copy(child = newChild) + + override def stringArgs: Iterator[Any] = Iterator(generator, generatorOutput, output, child) + + override def equals(obj: Any): Boolean = { + obj match { + case other: CometExplodeExec => + this.output == other.output && + this.generator == other.generator && + this.generatorOutput == other.generatorOutput && + this.child == other.child && + this.serializedPlanOpt == other.serializedPlanOpt + case _ => + false + } + } + + override def hashCode(): Int = Objects.hashCode(output, generator, generatorOutput, child) + + override lazy val metrics: Map[String, SQLMetric] = + CometMetricNode.baselineMetrics(sparkContext) ++ + Map( + "input_batches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"), + "input_rows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "output_batches" -> SQLMetrics.createMetric(sparkContext, "number of output batches")) +} + object CometUnionExec extends CometSink[UnionExec] { override def enabledConfig: Option[ConfigEntry[Boolean]] = Some( diff --git a/spark/src/test/resources/tpcds-micro-benchmarks/explode.sql b/spark/src/test/resources/tpcds-micro-benchmarks/explode.sql new file mode 100644 index 0000000000..79eacfda09 --- /dev/null +++ b/spark/src/test/resources/tpcds-micro-benchmarks/explode.sql @@ -0,0 +1,4 @@ +SELECT i_item_sk, explode(array(i_brand_id, i_class_id, i_category_id, i_manufact_id, i_manager_id)) +FROM item +ORDER BY i_item_sk +LIMIT 1000 \ No newline at end of file diff --git a/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala new file mode 100644 index 0000000000..a9ac3deb34 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/exec/CometGenerateExecSuite.scala @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.exec + +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.execution.GenerateExec +import org.apache.spark.sql.functions.col + +import org.apache.comet.CometConf + +class CometGenerateExecSuite extends CometTestBase { + + import testImplicits._ + + test("explode with simple array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array(1, 2, 3)), (2, Array(4, 5)), (3, Array(6))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with empty array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array(1, 2)), (2, Array.empty[Int]), (3, Array(3))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with null array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Some(Array(1, 2))), (2, None), (3, Some(Array(3)))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode_outer with simple array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[GenerateExec]) -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array(1, 2, 3)), (2, Array(4, 5)), (3, Array(6))) + .toDF("id", "arr") + .selectExpr("id", "explode_outer(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + // https://github.com/apache/datafusion-comet/issues/2838 + ignore("explode_outer with empty array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array(1, 2)), (2, Array.empty[Int]), (3, Array(3))) + .toDF("id", "arr") + .selectExpr("id", "explode_outer(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode_outer with null array") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[GenerateExec]) -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Some(Array(1, 2))), (2, None), (3, Some(Array(3)))) + .toDF("id", "arr") + .selectExpr("id", "explode_outer(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with multiple columns") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, "A", Array(1, 2, 3)), (2, "B", Array(4, 5)), (3, "C", Array(6))) + .toDF("id", "name", "arr") + .selectExpr("id", "name", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with array of strings") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array("a", "b", "c")), (2, Array("d", "e")), (3, Array("f"))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with filter") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Array(1, 2, 3)), (2, Array(4, 5, 6)), (3, Array(7, 8, 9))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + .filter(col("value") > 5) + checkSparkAnswerAndOperator(df) + } + } + + test("explode fallback when disabled") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "false") { + val df = Seq((1, Array(1, 2, 3)), (2, Array(4, 5))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + checkSparkAnswerAndFallbackReason( + df, + "Native support for operator GenerateExec is disabled") + } + } + + test("explode with map input falls back") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Map("a" -> 1, "b" -> 2)), (2, Map("c" -> 3))) + .toDF("id", "map") + .selectExpr("id", "explode(map) as (key, value)") + checkSparkAnswerAndFallbackReason( + df, + "Comet only supports explode/explode_outer for arrays, not maps") + } + } + + test("explode with nullable projected column") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq((1, Some("A"), Array(1, 2)), (2, None, Array(3, 4)), (3, Some("C"), Array(5))) + .toDF("id", "name", "arr") + .selectExpr("id", "name", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + // https://github.com/apache/datafusion-comet/issues/2838 + ignore("explode_outer with nullable projected column") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = + Seq((1, Some("A"), Array(1, 2)), (2, None, Array.empty[Int]), (3, Some("C"), Array(5))) + .toDF("id", "name", "arr") + .selectExpr("id", "name", "explode_outer(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with mixed null, empty, and non-empty arrays") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq( + (1, Some(Array(1, 2))), + (2, None), + (3, Some(Array.empty[Int])), + (4, Some(Array(3))), + (5, None), + (6, Some(Array(4, 5, 6)))) + .toDF("id", "arr") + .selectExpr("id", "explode(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + // https://github.com/apache/datafusion-comet/issues/2838 + ignore("explode_outer with mixed null, empty, and non-empty arrays") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq( + (1, Some(Array(1, 2))), + (2, None), + (3, Some(Array.empty[Int])), + (4, Some(Array(3))), + (5, None), + (6, Some(Array(4, 5, 6)))) + .toDF("id", "arr") + .selectExpr("id", "explode_outer(arr) as value") + checkSparkAnswerAndOperator(df) + } + } + + test("explode with multiple nullable columns") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_EXPLODE_ENABLED.key -> "true") { + val df = Seq( + (Some(1), Some("A"), Some(100), Array(1, 2)), + (None, Some("B"), None, Array(3)), + (Some(3), None, Some(300), Array(4, 5)), + (None, None, None, Array(6))) + .toDF("id", "name", "value", "arr") + .selectExpr("id", "name", "value", "explode(arr) as element") + checkSparkAnswerAndOperator(df) + } + } + +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala index a672d0937d..d9c49bc596 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala @@ -76,6 +76,7 @@ object CometTPCDSMicroBenchmark extends CometTPCQueryBenchmarkBase { "join_left_outer", "join_semi", "rlike", + "explode", "to_json") override def runQueries(